~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to source4/wrepl_server/wrepl_out_helpers.c

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* 
 
2
   Unix SMB/CIFS implementation.
 
3
   
 
4
   WINS Replication server
 
5
   
 
6
   Copyright (C) Stefan Metzmacher      2005
 
7
   
 
8
   This program is free software; you can redistribute it and/or modify
 
9
   it under the terms of the GNU General Public License as published by
 
10
   the Free Software Foundation; either version 3 of the License, or
 
11
   (at your option) any later version.
 
12
   
 
13
   This program is distributed in the hope that it will be useful,
 
14
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
   GNU General Public License for more details.
 
17
   
 
18
   You should have received a copy of the GNU General Public License
 
19
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
#include "includes.h"
 
23
#include "lib/events/events.h"
 
24
#include "lib/socket/socket.h"
 
25
#include "smbd/service_task.h"
 
26
#include "smbd/service_stream.h"
 
27
#include "librpc/gen_ndr/winsrepl.h"
 
28
#include "wrepl_server/wrepl_server.h"
 
29
#include "nbt_server/wins/winsdb.h"
 
30
#include "libcli/composite/composite.h"
 
31
#include "libcli/wrepl/winsrepl.h"
 
32
#include "libcli/resolve/resolve.h"
 
33
#include "param/param.h"
 
34
 
 
35
enum wreplsrv_out_connect_stage {
 
36
        WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
 
37
        WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
 
38
        WREPLSRV_OUT_CONNECT_STAGE_DONE
 
39
};
 
40
 
 
41
struct wreplsrv_out_connect_state {
 
42
        enum wreplsrv_out_connect_stage stage;
 
43
        struct composite_context *c;
 
44
        struct wrepl_request *req;
 
45
        struct composite_context *c_req;
 
46
        struct wrepl_associate assoc_io;
 
47
        enum winsrepl_partner_type type;
 
48
        struct wreplsrv_out_connection *wreplconn;
 
49
};
 
50
 
 
51
static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
 
52
static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
 
53
 
 
54
static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
 
55
{
 
56
        NTSTATUS status;
 
57
 
 
58
        status = wrepl_connect_recv(state->c_req);
 
59
        NT_STATUS_NOT_OK_RETURN(status);
 
60
 
 
61
        state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
 
62
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
63
 
 
64
        state->req->async.fn            = wreplsrv_out_connect_handler_req;
 
65
        state->req->async.private_data  = state;
 
66
 
 
67
        state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
 
68
 
 
69
        return NT_STATUS_OK;
 
70
}
 
71
 
 
72
static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
 
73
{
 
74
        NTSTATUS status;
 
75
 
 
76
        status = wrepl_associate_recv(state->req, &state->assoc_io);
 
77
        NT_STATUS_NOT_OK_RETURN(status);
 
78
 
 
79
        state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
 
80
        state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
 
81
 
 
82
        if (state->type == WINSREPL_PARTNER_PUSH) {
 
83
                if (state->wreplconn->assoc_ctx.peer_major >= 5) {
 
84
                        state->wreplconn->partner->push.wreplconn = state->wreplconn;
 
85
                        talloc_steal(state->wreplconn->partner, state->wreplconn);
 
86
                } else {
 
87
                        state->type = WINSREPL_PARTNER_NONE;
 
88
                }
 
89
        } else if (state->type == WINSREPL_PARTNER_PULL) {
 
90
                state->wreplconn->partner->pull.wreplconn = state->wreplconn;
 
91
                talloc_steal(state->wreplconn->partner, state->wreplconn);
 
92
        }
 
93
 
 
94
        state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
 
95
 
 
96
        return NT_STATUS_OK;
 
97
}
 
98
 
 
99
static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
 
100
{
 
101
        struct composite_context *c = state->c;
 
102
 
 
103
        switch (state->stage) {
 
104
        case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
 
105
                c->status = wreplsrv_out_connect_wait_socket(state);
 
106
                break;
 
107
        case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
 
108
                c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
 
109
                c->state  = COMPOSITE_STATE_DONE;
 
110
                break;
 
111
        case WREPLSRV_OUT_CONNECT_STAGE_DONE:
 
112
                c->status = NT_STATUS_INTERNAL_ERROR;
 
113
        }
 
114
 
 
115
        if (!NT_STATUS_IS_OK(c->status)) {
 
116
                c->state = COMPOSITE_STATE_ERROR;
 
117
        }
 
118
 
 
119
        if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
 
120
                c->async.fn(c);
 
121
        }
 
122
}
 
123
 
 
124
static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
 
125
{
 
126
        struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
 
127
                                                   struct wreplsrv_out_connect_state);
 
128
        wreplsrv_out_connect_handler(state);
 
129
        return;
 
130
}
 
131
 
 
132
static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
 
133
{
 
134
        struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private_data,
 
135
                                                   struct wreplsrv_out_connect_state);
 
136
        wreplsrv_out_connect_handler(state);
 
137
        return;
 
138
}
 
139
 
 
140
static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
 
141
                                                           enum winsrepl_partner_type type,
 
142
                                                           struct wreplsrv_out_connection *wreplconn)
 
143
{
 
144
        struct composite_context *c = NULL;
 
145
        struct wreplsrv_service *service = partner->service;
 
146
        struct wreplsrv_out_connect_state *state = NULL;
 
147
        struct wreplsrv_out_connection **wreplconnp = &wreplconn;
 
148
        bool cached_connection = false;
 
149
 
 
150
        c = talloc_zero(partner, struct composite_context);
 
151
        if (!c) goto failed;
 
152
 
 
153
        state = talloc_zero(c, struct wreplsrv_out_connect_state);
 
154
        if (!state) goto failed;
 
155
        state->c        = c;
 
156
        state->type     = type;
 
157
 
 
158
        c->state        = COMPOSITE_STATE_IN_PROGRESS;
 
159
        c->event_ctx    = service->task->event_ctx;
 
160
        c->private_data = state;
 
161
 
 
162
        if (type == WINSREPL_PARTNER_PUSH) {
 
163
                cached_connection       = true;
 
164
                wreplconn               = partner->push.wreplconn;
 
165
                wreplconnp              = &partner->push.wreplconn;
 
166
        } else if (type == WINSREPL_PARTNER_PULL) {
 
167
                cached_connection       = true;
 
168
                wreplconn               = partner->pull.wreplconn;
 
169
                wreplconnp              = &partner->pull.wreplconn;
 
170
        }
 
171
 
 
172
        /* we have a connection already, so use it */
 
173
        if (wreplconn) {
 
174
                if (!wreplconn->sock->dead) {
 
175
                        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
 
176
                        state->wreplconn= wreplconn;
 
177
                        composite_done(c);
 
178
                        return c;
 
179
                } else if (!cached_connection) {
 
180
                        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
 
181
                        state->wreplconn= NULL;
 
182
                        composite_done(c);
 
183
                        return c;
 
184
                } else {
 
185
                        talloc_free(wreplconn);
 
186
                        *wreplconnp = NULL;
 
187
                }
 
188
        }
 
189
 
 
190
        wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
 
191
        if (!wreplconn) goto failed;
 
192
 
 
193
        wreplconn->service      = service;
 
194
        wreplconn->partner      = partner;
 
195
        wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
 
196
        if (!wreplconn->sock) goto failed;
 
197
 
 
198
        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
 
199
        state->wreplconn= wreplconn;
 
200
        state->c_req    = wrepl_connect_send(wreplconn->sock,
 
201
                                             partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
 
202
                                             partner->address);
 
203
        if (!state->c_req) goto failed;
 
204
 
 
205
        state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
 
206
        state->c_req->async.private_data        = state;
 
207
 
 
208
        return c;
 
209
failed:
 
210
        talloc_free(c);
 
211
        return NULL;
 
212
}
 
213
 
 
214
static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
 
215
                                          struct wreplsrv_out_connection **wreplconn)
 
216
{
 
217
        NTSTATUS status;
 
218
 
 
219
        status = composite_wait(c);
 
220
 
 
221
        if (NT_STATUS_IS_OK(status)) {
 
222
                struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
 
223
                                                           struct wreplsrv_out_connect_state);
 
224
                if (state->wreplconn) {
 
225
                        *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
 
226
                        if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
 
227
                } else {
 
228
                        status = NT_STATUS_INVALID_CONNECTION;
 
229
                }
 
230
        }
 
231
 
 
232
        talloc_free(c);
 
233
        return status;
 
234
        
 
235
}
 
236
 
 
237
struct wreplsrv_pull_table_io {
 
238
        struct {
 
239
                struct wreplsrv_partner *partner;
 
240
                uint32_t num_owners;
 
241
                struct wrepl_wins_owner *owners;
 
242
        } in;
 
243
        struct {
 
244
                uint32_t num_owners;
 
245
                struct wrepl_wins_owner *owners;
 
246
        } out;
 
247
};
 
248
 
 
249
enum wreplsrv_pull_table_stage {
 
250
        WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
 
251
        WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
 
252
        WREPLSRV_PULL_TABLE_STAGE_DONE
 
253
};
 
254
 
 
255
struct wreplsrv_pull_table_state {
 
256
        enum wreplsrv_pull_table_stage stage;
 
257
        struct composite_context *c;
 
258
        struct wrepl_request *req;
 
259
        struct wrepl_pull_table table_io;
 
260
        struct wreplsrv_pull_table_io *io;
 
261
        struct composite_context *creq;
 
262
        struct wreplsrv_out_connection *wreplconn;
 
263
};
 
264
 
 
265
static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
 
266
 
 
267
static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
 
268
{
 
269
        NTSTATUS status;
 
270
 
 
271
        status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
 
272
        NT_STATUS_NOT_OK_RETURN(status);
 
273
 
 
274
        state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
 
275
        state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
 
276
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
277
 
 
278
        state->req->async.fn            = wreplsrv_pull_table_handler_req;
 
279
        state->req->async.private_data  = state;
 
280
 
 
281
        state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
 
282
 
 
283
        return NT_STATUS_OK;
 
284
}
 
285
 
 
286
static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
 
287
{
 
288
        NTSTATUS status;
 
289
 
 
290
        status = wrepl_pull_table_recv(state->req, state, &state->table_io);
 
291
        NT_STATUS_NOT_OK_RETURN(status);
 
292
 
 
293
        state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
 
294
 
 
295
        return NT_STATUS_OK;
 
296
}
 
297
 
 
298
static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
 
299
{
 
300
        struct composite_context *c = state->c;
 
301
 
 
302
        switch (state->stage) {
 
303
        case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
 
304
                c->status = wreplsrv_pull_table_wait_connection(state);
 
305
                break;
 
306
        case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
 
307
                c->status = wreplsrv_pull_table_wait_table_reply(state);
 
308
                c->state  = COMPOSITE_STATE_DONE;
 
309
                break;
 
310
        case WREPLSRV_PULL_TABLE_STAGE_DONE:
 
311
                c->status = NT_STATUS_INTERNAL_ERROR;
 
312
        }
 
313
 
 
314
        if (!NT_STATUS_IS_OK(c->status)) {
 
315
                c->state = COMPOSITE_STATE_ERROR;
 
316
        }
 
317
 
 
318
        if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
 
319
                c->async.fn(c);
 
320
        }
 
321
}
 
322
 
 
323
static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
 
324
{
 
325
        struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
 
326
                                                  struct wreplsrv_pull_table_state);
 
327
        wreplsrv_pull_table_handler(state);
 
328
        return;
 
329
}
 
330
 
 
331
static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
 
332
{
 
333
        struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private_data,
 
334
                                                  struct wreplsrv_pull_table_state);
 
335
        wreplsrv_pull_table_handler(state);
 
336
        return;
 
337
}
 
338
 
 
339
static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
 
340
{
 
341
        struct composite_context *c = NULL;
 
342
        struct wreplsrv_service *service = io->in.partner->service;
 
343
        struct wreplsrv_pull_table_state *state = NULL;
 
344
 
 
345
        c = talloc_zero(mem_ctx, struct composite_context);
 
346
        if (!c) goto failed;
 
347
 
 
348
        state = talloc_zero(c, struct wreplsrv_pull_table_state);
 
349
        if (!state) goto failed;
 
350
        state->c        = c;
 
351
        state->io       = io;
 
352
 
 
353
        c->state        = COMPOSITE_STATE_IN_PROGRESS;
 
354
        c->event_ctx    = service->task->event_ctx;
 
355
        c->private_data = state;
 
356
 
 
357
        if (io->in.num_owners) {
 
358
                state->table_io.out.num_partners        = io->in.num_owners;
 
359
                state->table_io.out.partners            = io->in.owners;
 
360
                state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
 
361
                composite_done(c);
 
362
                return c;
 
363
        }
 
364
 
 
365
        state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
 
366
        state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
 
367
        if (!state->creq) goto failed;
 
368
 
 
369
        state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
 
370
        state->creq->async.private_data = state;
 
371
 
 
372
        return c;
 
373
failed:
 
374
        talloc_free(c);
 
375
        return NULL;
 
376
}
 
377
 
 
378
static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
 
379
                                         struct wreplsrv_pull_table_io *io)
 
380
{
 
381
        NTSTATUS status;
 
382
 
 
383
        status = composite_wait(c);
 
384
 
 
385
        if (NT_STATUS_IS_OK(status)) {
 
386
                struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
 
387
                                                          struct wreplsrv_pull_table_state);
 
388
                io->out.num_owners      = state->table_io.out.num_partners;
 
389
                io->out.owners          = talloc_reference(mem_ctx, state->table_io.out.partners);
 
390
        }
 
391
 
 
392
        talloc_free(c);
 
393
        return status;  
 
394
}
 
395
 
 
396
struct wreplsrv_pull_names_io {
 
397
        struct {
 
398
                struct wreplsrv_partner *partner;
 
399
                struct wreplsrv_out_connection *wreplconn;
 
400
                struct wrepl_wins_owner owner;
 
401
        } in;
 
402
        struct {
 
403
                uint32_t num_names;
 
404
                struct wrepl_name *names;
 
405
        } out;
 
406
};
 
407
 
 
408
enum wreplsrv_pull_names_stage {
 
409
        WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
 
410
        WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
 
411
        WREPLSRV_PULL_NAMES_STAGE_DONE
 
412
};
 
413
 
 
414
struct wreplsrv_pull_names_state {
 
415
        enum wreplsrv_pull_names_stage stage;
 
416
        struct composite_context *c;
 
417
        struct wrepl_request *req;
 
418
        struct wrepl_pull_names pull_io;
 
419
        struct wreplsrv_pull_names_io *io;
 
420
        struct composite_context *creq;
 
421
        struct wreplsrv_out_connection *wreplconn;
 
422
};
 
423
 
 
424
static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
 
425
 
 
426
static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
 
427
{
 
428
        NTSTATUS status;
 
429
 
 
430
        status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
 
431
        NT_STATUS_NOT_OK_RETURN(status);
 
432
 
 
433
        state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
 
434
        state->pull_io.in.partner       = state->io->in.owner;
 
435
        state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
 
436
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
437
 
 
438
        state->req->async.fn            = wreplsrv_pull_names_handler_req;
 
439
        state->req->async.private_data  = state;
 
440
 
 
441
        state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
 
442
 
 
443
        return NT_STATUS_OK;
 
444
}
 
445
 
 
446
static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
 
447
{
 
448
        NTSTATUS status;
 
449
 
 
450
        status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
 
451
        NT_STATUS_NOT_OK_RETURN(status);
 
452
 
 
453
        state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
 
454
 
 
455
        return NT_STATUS_OK;
 
456
}
 
457
 
 
458
static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
 
459
{
 
460
        struct composite_context *c = state->c;
 
461
 
 
462
        switch (state->stage) {
 
463
        case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
 
464
                c->status = wreplsrv_pull_names_wait_connection(state);
 
465
                break;
 
466
        case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
 
467
                c->status = wreplsrv_pull_names_wait_send_reply(state);
 
468
                c->state  = COMPOSITE_STATE_DONE;
 
469
                break;
 
470
        case WREPLSRV_PULL_NAMES_STAGE_DONE:
 
471
                c->status = NT_STATUS_INTERNAL_ERROR;
 
472
        }
 
473
 
 
474
        if (!NT_STATUS_IS_OK(c->status)) {
 
475
                c->state = COMPOSITE_STATE_ERROR;
 
476
        }
 
477
 
 
478
        if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
 
479
                c->async.fn(c);
 
480
        }
 
481
}
 
482
 
 
483
static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
 
484
{
 
485
        struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
 
486
                                                  struct wreplsrv_pull_names_state);
 
487
        wreplsrv_pull_names_handler(state);
 
488
        return;
 
489
}
 
490
 
 
491
static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
 
492
{
 
493
        struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private_data,
 
494
                                                  struct wreplsrv_pull_names_state);
 
495
        wreplsrv_pull_names_handler(state);
 
496
        return;
 
497
}
 
498
 
 
499
static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
 
500
{
 
501
        struct composite_context *c = NULL;
 
502
        struct wreplsrv_service *service = io->in.partner->service;
 
503
        struct wreplsrv_pull_names_state *state = NULL;
 
504
        enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
 
505
 
 
506
        if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
 
507
 
 
508
        c = talloc_zero(mem_ctx, struct composite_context);
 
509
        if (!c) goto failed;
 
510
 
 
511
        state = talloc_zero(c, struct wreplsrv_pull_names_state);
 
512
        if (!state) goto failed;
 
513
        state->c        = c;
 
514
        state->io       = io;
 
515
 
 
516
        c->state        = COMPOSITE_STATE_IN_PROGRESS;
 
517
        c->event_ctx    = service->task->event_ctx;
 
518
        c->private_data = state;
 
519
 
 
520
        state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
 
521
        state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
 
522
        if (!state->creq) goto failed;
 
523
 
 
524
        state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
 
525
        state->creq->async.private_data = state;
 
526
 
 
527
        return c;
 
528
failed:
 
529
        talloc_free(c);
 
530
        return NULL;
 
531
}
 
532
 
 
533
static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
 
534
                                         struct wreplsrv_pull_names_io *io)
 
535
{
 
536
        NTSTATUS status;
 
537
 
 
538
        status = composite_wait(c);
 
539
 
 
540
        if (NT_STATUS_IS_OK(status)) {
 
541
                struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
 
542
                                                          struct wreplsrv_pull_names_state);
 
543
                io->out.num_names       = state->pull_io.out.num_names;
 
544
                io->out.names           = talloc_reference(mem_ctx, state->pull_io.out.names);
 
545
        }
 
546
 
 
547
        talloc_free(c);
 
548
        return status;
 
549
        
 
550
}
 
551
 
 
552
enum wreplsrv_pull_cycle_stage {
 
553
        WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
 
554
        WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
 
555
        WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
 
556
        WREPLSRV_PULL_CYCLE_STAGE_DONE
 
557
};
 
558
 
 
559
struct wreplsrv_pull_cycle_state {
 
560
        enum wreplsrv_pull_cycle_stage stage;
 
561
        struct composite_context *c;
 
562
        struct wreplsrv_pull_cycle_io *io;
 
563
        struct wreplsrv_pull_table_io table_io;
 
564
        uint32_t current;
 
565
        struct wreplsrv_pull_names_io names_io;
 
566
        struct composite_context *creq;
 
567
        struct wrepl_associate_stop assoc_stop_io;
 
568
        struct wrepl_request *req;
 
569
};
 
570
 
 
571
static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
 
572
static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
 
573
 
 
574
static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
 
575
{
 
576
        struct wreplsrv_owner *current_owner=NULL;
 
577
        struct wreplsrv_owner *local_owner;
 
578
        uint32_t i;
 
579
        uint64_t old_max_version = 0;
 
580
        bool do_pull = false;
 
581
 
 
582
        for (i=state->current; i < state->table_io.out.num_owners; i++) {
 
583
                current_owner = wreplsrv_find_owner(state->io->in.partner->service,
 
584
                                                    state->io->in.partner->pull.table,
 
585
                                                    state->table_io.out.owners[i].address);
 
586
 
 
587
                local_owner = wreplsrv_find_owner(state->io->in.partner->service,
 
588
                                                  state->io->in.partner->service->table,
 
589
                                                  state->table_io.out.owners[i].address);
 
590
                /*
 
591
                 * this means we are ourself the current owner,
 
592
                 * and we don't want replicate ourself
 
593
                 */
 
594
                if (!current_owner) continue;
 
595
 
 
596
                /*
 
597
                 * this means we don't have any records of this owner
 
598
                 * so fetch them
 
599
                 */
 
600
                if (!local_owner) {
 
601
                        do_pull         = true;
 
602
                        
 
603
                        break;
 
604
                }
 
605
 
 
606
                /*
 
607
                 * this means the remote partner has some new records of this owner
 
608
                 * fetch them
 
609
                 */
 
610
                if (current_owner->owner.max_version > local_owner->owner.max_version) {
 
611
                        do_pull         = true;
 
612
                        old_max_version = local_owner->owner.max_version;
 
613
                        break;
 
614
                }
 
615
        }
 
616
        state->current = i;
 
617
 
 
618
        if (do_pull) {
 
619
                state->names_io.in.partner              = state->io->in.partner;
 
620
                state->names_io.in.wreplconn            = state->io->in.wreplconn;
 
621
                state->names_io.in.owner                = current_owner->owner;
 
622
                state->names_io.in.owner.min_version    = old_max_version + 1;
 
623
                state->creq = wreplsrv_pull_names_send(state, &state->names_io);
 
624
                NT_STATUS_HAVE_NO_MEMORY(state->creq);
 
625
 
 
626
                state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
 
627
                state->creq->async.private_data = state;
 
628
 
 
629
                return STATUS_MORE_ENTRIES;
 
630
        }
 
631
 
 
632
        return NT_STATUS_OK;
 
633
}
 
634
 
 
635
static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
 
636
{
 
637
        NTSTATUS status;
 
638
 
 
639
        status = wreplsrv_pull_cycle_next_owner_do_work(state);
 
640
        if (NT_STATUS_IS_OK(status)) {
 
641
                state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
 
642
        } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
 
643
                state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
 
644
                status = NT_STATUS_OK;
 
645
        }
 
646
 
 
647
        if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
 
648
                state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
 
649
                state->assoc_stop_io.in.reason          = 0;
 
650
                state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
 
651
                NT_STATUS_HAVE_NO_MEMORY(state->req);
 
652
 
 
653
                state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
 
654
                state->req->async.private_data  = state;
 
655
 
 
656
                state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
 
657
        }
 
658
 
 
659
        return status;
 
660
}
 
661
 
 
662
static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
 
663
{
 
664
        NTSTATUS status;
 
665
        uint32_t i;
 
666
 
 
667
        status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
 
668
        NT_STATUS_NOT_OK_RETURN(status);
 
669
 
 
670
        /* update partner table */
 
671
        for (i=0; i < state->table_io.out.num_owners; i++) {
 
672
                status = wreplsrv_add_table(state->io->in.partner->service,
 
673
                                            state->io->in.partner, 
 
674
                                            &state->io->in.partner->pull.table,
 
675
                                            state->table_io.out.owners[i].address,
 
676
                                            state->table_io.out.owners[i].max_version);
 
677
                NT_STATUS_NOT_OK_RETURN(status);
 
678
        }
 
679
 
 
680
        status = wreplsrv_pull_cycle_next_owner_wrapper(state);
 
681
        NT_STATUS_NOT_OK_RETURN(status);
 
682
 
 
683
        return status;
 
684
}
 
685
 
 
686
static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
 
687
{
 
688
        NTSTATUS status;
 
689
 
 
690
        status = wreplsrv_apply_records(state->io->in.partner,
 
691
                                        &state->names_io.in.owner,
 
692
                                        state->names_io.out.num_names,
 
693
                                        state->names_io.out.names);
 
694
        NT_STATUS_NOT_OK_RETURN(status);
 
695
 
 
696
        talloc_free(state->names_io.out.names);
 
697
        ZERO_STRUCT(state->names_io);
 
698
 
 
699
        return NT_STATUS_OK;
 
700
}
 
701
 
 
702
static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
 
703
{
 
704
        NTSTATUS status;
 
705
 
 
706
        status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
 
707
        NT_STATUS_NOT_OK_RETURN(status);
 
708
 
 
709
        /*
 
710
         * TODO: this should maybe an async call,
 
711
         *       because we may need some network access
 
712
         *       for conflict resolving
 
713
         */
 
714
        status = wreplsrv_pull_cycle_apply_records(state);
 
715
        NT_STATUS_NOT_OK_RETURN(status);
 
716
 
 
717
        status = wreplsrv_pull_cycle_next_owner_wrapper(state);
 
718
        NT_STATUS_NOT_OK_RETURN(status);
 
719
 
 
720
        return status;
 
721
}
 
722
 
 
723
static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
 
724
{
 
725
        NTSTATUS status;
 
726
 
 
727
        status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
 
728
        NT_STATUS_NOT_OK_RETURN(status);
 
729
 
 
730
        state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
 
731
 
 
732
        return status;
 
733
}
 
734
 
 
735
static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
 
736
{
 
737
        struct composite_context *c = state->c;
 
738
 
 
739
        switch (state->stage) {
 
740
        case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
 
741
                c->status = wreplsrv_pull_cycle_wait_table_reply(state);
 
742
                break;
 
743
        case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
 
744
                c->status = wreplsrv_pull_cycle_wait_send_replies(state);
 
745
                break;
 
746
        case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
 
747
                c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
 
748
                break;
 
749
        case WREPLSRV_PULL_CYCLE_STAGE_DONE:
 
750
                c->status = NT_STATUS_INTERNAL_ERROR;
 
751
        }
 
752
 
 
753
        if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
 
754
                c->state  = COMPOSITE_STATE_DONE;
 
755
        }
 
756
 
 
757
        if (!NT_STATUS_IS_OK(c->status)) {
 
758
                c->state = COMPOSITE_STATE_ERROR;
 
759
        }
 
760
 
 
761
        if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
 
762
                c->async.fn(c);
 
763
        }
 
764
}
 
765
 
 
766
static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
 
767
{
 
768
        struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
 
769
                                                  struct wreplsrv_pull_cycle_state);
 
770
        wreplsrv_pull_cycle_handler(state);
 
771
        return;
 
772
}
 
773
 
 
774
static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
 
775
{
 
776
        struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private_data,
 
777
                                                  struct wreplsrv_pull_cycle_state);
 
778
        wreplsrv_pull_cycle_handler(state);
 
779
        return;
 
780
}
 
781
 
 
782
struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
 
783
{
 
784
        struct composite_context *c = NULL;
 
785
        struct wreplsrv_service *service = io->in.partner->service;
 
786
        struct wreplsrv_pull_cycle_state *state = NULL;
 
787
 
 
788
        c = talloc_zero(mem_ctx, struct composite_context);
 
789
        if (!c) goto failed;
 
790
 
 
791
        state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
 
792
        if (!state) goto failed;
 
793
        state->c        = c;
 
794
        state->io       = io;
 
795
 
 
796
        c->state        = COMPOSITE_STATE_IN_PROGRESS;
 
797
        c->event_ctx    = service->task->event_ctx;
 
798
        c->private_data = state;
 
799
 
 
800
        state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
 
801
        state->table_io.in.partner      = io->in.partner;
 
802
        state->table_io.in.num_owners   = io->in.num_owners;
 
803
        state->table_io.in.owners       = io->in.owners;
 
804
        state->creq = wreplsrv_pull_table_send(state, &state->table_io);
 
805
        if (!state->creq) goto failed;
 
806
 
 
807
        state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
 
808
        state->creq->async.private_data = state;
 
809
 
 
810
        return c;
 
811
failed:
 
812
        talloc_free(c);
 
813
        return NULL;
 
814
}
 
815
 
 
816
NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
 
817
{
 
818
        NTSTATUS status;
 
819
 
 
820
        status = composite_wait(c);
 
821
 
 
822
        talloc_free(c);
 
823
        return status;
 
824
}
 
825
 
 
826
enum wreplsrv_push_notify_stage {
 
827
        WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
 
828
        WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
 
829
        WREPLSRV_PUSH_NOTIFY_STAGE_DONE
 
830
};
 
831
 
 
832
struct wreplsrv_push_notify_state {
 
833
        enum wreplsrv_push_notify_stage stage;
 
834
        struct composite_context *c;
 
835
        struct wreplsrv_push_notify_io *io;
 
836
        enum wrepl_replication_cmd command;
 
837
        bool full_table;
 
838
        struct wrepl_send_ctrl ctrl;
 
839
        struct wrepl_request *req;
 
840
        struct wrepl_packet req_packet;
 
841
        struct wrepl_packet *rep_packet;
 
842
        struct composite_context *creq;
 
843
        struct wreplsrv_out_connection *wreplconn;
 
844
};
 
845
 
 
846
static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
 
847
static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
 
848
 
 
849
static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
 
850
{
 
851
        struct wreplsrv_service *service = state->io->in.partner->service;
 
852
        struct wrepl_packet *req = &state->req_packet;
 
853
        struct wrepl_replication *repl_out = &state->req_packet.message.replication;
 
854
        struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
 
855
        struct wreplsrv_in_connection *wrepl_in;
 
856
        NTSTATUS status;
 
857
        struct socket_context *sock;
 
858
        struct packet_context *packet;
 
859
        uint16_t fde_flags;
 
860
 
 
861
        /* prepare the outgoing request */
 
862
        req->opcode     = WREPL_OPCODE_BITS;
 
863
        req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
 
864
        req->mess_type  = WREPL_REPLICATION;
 
865
 
 
866
        repl_out->command = state->command;
 
867
 
 
868
        status = wreplsrv_fill_wrepl_table(service, state, table_out,
 
869
                                           service->wins_db->local_owner, state->full_table);
 
870
        NT_STATUS_NOT_OK_RETURN(status);
 
871
 
 
872
        /* queue the request */
 
873
        state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
 
874
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
875
 
 
876
        /*
 
877
         * now we need to convert the wrepl_socket (client connection)
 
878
         * into a wreplsrv_in_connection (server connection), because
 
879
         * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
 
880
         * message is received by the peer.
 
881
         */
 
882
 
 
883
        /* steal the socket_context */
 
884
        sock = state->wreplconn->sock->sock;
 
885
        state->wreplconn->sock->sock = NULL;
 
886
        talloc_steal(state, sock);
 
887
 
 
888
        /* 
 
889
         * steal the packet_context
 
890
         * note the request DATA_BLOB we just send on the
 
891
         * wrepl_socket (client connection) is still unter the 
 
892
         * packet context and will be send to the wire
 
893
         */
 
894
        packet = state->wreplconn->sock->packet;
 
895
        state->wreplconn->sock->packet = NULL;
 
896
        talloc_steal(state, packet);
 
897
 
 
898
        /*
 
899
         * get the fde_flags of the old fde event,
 
900
         * so that we can later set the same flags to the new one
 
901
         */
 
902
        fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
 
903
 
 
904
        /*
 
905
         * free the wrepl_socket (client connection)
 
906
         */
 
907
        talloc_free(state->wreplconn->sock);
 
908
        state->wreplconn->sock = NULL;
 
909
 
 
910
        /*
 
911
         * now create a wreplsrv_in_connection,
 
912
         * on which we act as server
 
913
         *
 
914
         * NOTE: sock and packet will be stolen by
 
915
         *       wreplsrv_in_connection_merge()
 
916
         */
 
917
        status = wreplsrv_in_connection_merge(state->io->in.partner,
 
918
                                              sock, packet, &wrepl_in);
 
919
        NT_STATUS_NOT_OK_RETURN(status);
 
920
 
 
921
        event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
 
922
 
 
923
        wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
 
924
        wrepl_in->assoc_ctx.our_ctx     = 0;
 
925
 
 
926
        /* now we can free the wreplsrv_out_connection */
 
927
        talloc_free(state->wreplconn);
 
928
        state->wreplconn = NULL;
 
929
 
 
930
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
 
931
 
 
932
        return NT_STATUS_OK;
 
933
}
 
934
 
 
935
static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
 
936
{
 
937
        struct wreplsrv_service *service = state->io->in.partner->service;
 
938
        struct wrepl_packet *req = &state->req_packet;
 
939
        struct wrepl_replication *repl_out = &state->req_packet.message.replication;
 
940
        struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
 
941
        NTSTATUS status;
 
942
 
 
943
        req->opcode     = WREPL_OPCODE_BITS;
 
944
        req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
 
945
        req->mess_type  = WREPL_REPLICATION;
 
946
 
 
947
        repl_out->command = state->command;
 
948
 
 
949
        status = wreplsrv_fill_wrepl_table(service, state, table_out,
 
950
                                           service->wins_db->local_owner, state->full_table);
 
951
        NT_STATUS_NOT_OK_RETURN(status);
 
952
 
 
953
        /* we won't get a reply to a inform message */
 
954
        state->ctrl.send_only           = true;
 
955
 
 
956
        state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
 
957
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
958
 
 
959
        state->req->async.fn            = wreplsrv_push_notify_handler_req;
 
960
        state->req->async.private_data  = state;
 
961
 
 
962
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
 
963
 
 
964
        return NT_STATUS_OK;
 
965
}
 
966
 
 
967
static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
 
968
{
 
969
        NTSTATUS status;
 
970
 
 
971
        status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
 
972
        NT_STATUS_NOT_OK_RETURN(status);
 
973
 
 
974
        /* is the peer doesn't support inform fallback to update */
 
975
        switch (state->command) {
 
976
        case WREPL_REPL_INFORM:
 
977
                if (state->wreplconn->assoc_ctx.peer_major < 5) {
 
978
                        state->command = WREPL_REPL_UPDATE;
 
979
                }
 
980
                break;
 
981
        case WREPL_REPL_INFORM2:
 
982
                if (state->wreplconn->assoc_ctx.peer_major < 5) {
 
983
                        state->command = WREPL_REPL_UPDATE2;
 
984
                }
 
985
                break;
 
986
        default:
 
987
                break;
 
988
        }
 
989
 
 
990
        switch (state->command) {
 
991
        case WREPL_REPL_UPDATE:
 
992
                state->full_table = true;
 
993
                return wreplsrv_push_notify_update(state);
 
994
        case WREPL_REPL_UPDATE2:
 
995
                state->full_table = false;
 
996
                return wreplsrv_push_notify_update(state);
 
997
        case WREPL_REPL_INFORM:
 
998
                state->full_table = true;
 
999
                return wreplsrv_push_notify_inform(state);
 
1000
        case WREPL_REPL_INFORM2:
 
1001
                state->full_table = false;
 
1002
                return wreplsrv_push_notify_inform(state);
 
1003
        default:
 
1004
                return NT_STATUS_INTERNAL_ERROR;
 
1005
        }
 
1006
 
 
1007
        return NT_STATUS_INTERNAL_ERROR;
 
1008
}
 
1009
 
 
1010
static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
 
1011
{
 
1012
        NTSTATUS status;
 
1013
 
 
1014
        status =  wrepl_request_recv(state->req, state, NULL);
 
1015
        NT_STATUS_NOT_OK_RETURN(status);
 
1016
 
 
1017
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
 
1018
        return status;
 
1019
}
 
1020
 
 
1021
static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
 
1022
{
 
1023
        struct composite_context *c = state->c;
 
1024
 
 
1025
        switch (state->stage) {
 
1026
        case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
 
1027
                c->status = wreplsrv_push_notify_wait_connect(state);
 
1028
                break;
 
1029
        case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
 
1030
                c->status = wreplsrv_push_notify_wait_inform(state);
 
1031
                break;
 
1032
        case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
 
1033
                c->status = NT_STATUS_INTERNAL_ERROR;
 
1034
        }
 
1035
 
 
1036
        if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
 
1037
                c->state  = COMPOSITE_STATE_DONE;
 
1038
        }
 
1039
 
 
1040
        if (!NT_STATUS_IS_OK(c->status)) {
 
1041
                c->state = COMPOSITE_STATE_ERROR;
 
1042
        }
 
1043
 
 
1044
        if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
 
1045
                c->async.fn(c);
 
1046
        }
 
1047
}
 
1048
 
 
1049
static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
 
1050
{
 
1051
        struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
 
1052
                                                   struct wreplsrv_push_notify_state);
 
1053
        wreplsrv_push_notify_handler(state);
 
1054
        return;
 
1055
}
 
1056
 
 
1057
static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
 
1058
{
 
1059
        struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
 
1060
                                                   struct wreplsrv_push_notify_state);
 
1061
        wreplsrv_push_notify_handler(state);
 
1062
        return;
 
1063
}
 
1064
 
 
1065
struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
 
1066
{
 
1067
        struct composite_context *c = NULL;
 
1068
        struct wreplsrv_service *service = io->in.partner->service;
 
1069
        struct wreplsrv_push_notify_state *state = NULL;
 
1070
        enum winsrepl_partner_type partner_type;
 
1071
 
 
1072
        c = talloc_zero(mem_ctx, struct composite_context);
 
1073
        if (!c) goto failed;
 
1074
 
 
1075
        state = talloc_zero(c, struct wreplsrv_push_notify_state);
 
1076
        if (!state) goto failed;
 
1077
        state->c        = c;
 
1078
        state->io       = io;
 
1079
 
 
1080
        if (io->in.inform) {
 
1081
                /* we can cache the connection in partner->push->wreplconn */
 
1082
                partner_type = WINSREPL_PARTNER_PUSH;
 
1083
                if (io->in.propagate) {
 
1084
                        state->command  = WREPL_REPL_INFORM2;
 
1085
                } else {
 
1086
                        state->command  = WREPL_REPL_INFORM;
 
1087
                }
 
1088
        } else {
 
1089
                /* we can NOT cache the connection */
 
1090
                partner_type = WINSREPL_PARTNER_NONE;
 
1091
                if (io->in.propagate) {
 
1092
                        state->command  = WREPL_REPL_UPDATE2;
 
1093
                } else {
 
1094
                        state->command  = WREPL_REPL_UPDATE;
 
1095
                }       
 
1096
        }
 
1097
 
 
1098
        c->state        = COMPOSITE_STATE_IN_PROGRESS;
 
1099
        c->event_ctx    = service->task->event_ctx;
 
1100
        c->private_data = state;
 
1101
 
 
1102
        state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
 
1103
        state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
 
1104
        if (!state->creq) goto failed;
 
1105
 
 
1106
        state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
 
1107
        state->creq->async.private_data = state;
 
1108
 
 
1109
        return c;
 
1110
failed:
 
1111
        talloc_free(c);
 
1112
        return NULL;
 
1113
}
 
1114
 
 
1115
NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
 
1116
{
 
1117
        NTSTATUS status;
 
1118
 
 
1119
        status = composite_wait(c);
 
1120
 
 
1121
        talloc_free(c);
 
1122
        return status;
 
1123
}