2
Unix SMB/CIFS implementation.
4
WINS Replication server
6
Copyright (C) Stefan Metzmacher 2005
8
This program is free software; you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation; either version 3 of the License, or
11
(at your option) any later version.
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU General Public License for more details.
18
You should have received a copy of the GNU General Public License
19
along with this program. If not, see <http://www.gnu.org/licenses/>.
23
#include "lib/events/events.h"
24
#include "lib/socket/socket.h"
25
#include "smbd/service_task.h"
26
#include "smbd/service_stream.h"
27
#include "librpc/gen_ndr/winsrepl.h"
28
#include "wrepl_server/wrepl_server.h"
29
#include "nbt_server/wins/winsdb.h"
30
#include "libcli/composite/composite.h"
31
#include "libcli/wrepl/winsrepl.h"
32
#include "libcli/resolve/resolve.h"
33
#include "param/param.h"
35
enum wreplsrv_out_connect_stage {
36
WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37
WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38
WREPLSRV_OUT_CONNECT_STAGE_DONE
41
struct wreplsrv_out_connect_state {
42
enum wreplsrv_out_connect_stage stage;
43
struct composite_context *c;
44
struct wrepl_request *req;
45
struct composite_context *c_req;
46
struct wrepl_associate assoc_io;
47
enum winsrepl_partner_type type;
48
struct wreplsrv_out_connection *wreplconn;
51
static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
52
static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
54
static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
58
status = wrepl_connect_recv(state->c_req);
59
NT_STATUS_NOT_OK_RETURN(status);
61
state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
62
NT_STATUS_HAVE_NO_MEMORY(state->req);
64
state->req->async.fn = wreplsrv_out_connect_handler_req;
65
state->req->async.private_data = state;
67
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
72
static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
76
status = wrepl_associate_recv(state->req, &state->assoc_io);
77
NT_STATUS_NOT_OK_RETURN(status);
79
state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80
state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
82
if (state->type == WINSREPL_PARTNER_PUSH) {
83
if (state->wreplconn->assoc_ctx.peer_major >= 5) {
84
state->wreplconn->partner->push.wreplconn = state->wreplconn;
85
talloc_steal(state->wreplconn->partner, state->wreplconn);
87
state->type = WINSREPL_PARTNER_NONE;
89
} else if (state->type == WINSREPL_PARTNER_PULL) {
90
state->wreplconn->partner->pull.wreplconn = state->wreplconn;
91
talloc_steal(state->wreplconn->partner, state->wreplconn);
94
state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
99
static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
101
struct composite_context *c = state->c;
103
switch (state->stage) {
104
case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
105
c->status = wreplsrv_out_connect_wait_socket(state);
107
case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
108
c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
109
c->state = COMPOSITE_STATE_DONE;
111
case WREPLSRV_OUT_CONNECT_STAGE_DONE:
112
c->status = NT_STATUS_INTERNAL_ERROR;
115
if (!NT_STATUS_IS_OK(c->status)) {
116
c->state = COMPOSITE_STATE_ERROR;
119
if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
124
static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
126
struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
127
struct wreplsrv_out_connect_state);
128
wreplsrv_out_connect_handler(state);
132
static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
134
struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private_data,
135
struct wreplsrv_out_connect_state);
136
wreplsrv_out_connect_handler(state);
140
static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
141
enum winsrepl_partner_type type,
142
struct wreplsrv_out_connection *wreplconn)
144
struct composite_context *c = NULL;
145
struct wreplsrv_service *service = partner->service;
146
struct wreplsrv_out_connect_state *state = NULL;
147
struct wreplsrv_out_connection **wreplconnp = &wreplconn;
148
bool cached_connection = false;
150
c = talloc_zero(partner, struct composite_context);
153
state = talloc_zero(c, struct wreplsrv_out_connect_state);
154
if (!state) goto failed;
158
c->state = COMPOSITE_STATE_IN_PROGRESS;
159
c->event_ctx = service->task->event_ctx;
160
c->private_data = state;
162
if (type == WINSREPL_PARTNER_PUSH) {
163
cached_connection = true;
164
wreplconn = partner->push.wreplconn;
165
wreplconnp = &partner->push.wreplconn;
166
} else if (type == WINSREPL_PARTNER_PULL) {
167
cached_connection = true;
168
wreplconn = partner->pull.wreplconn;
169
wreplconnp = &partner->pull.wreplconn;
172
/* we have a connection already, so use it */
174
if (!wreplconn->sock->dead) {
175
state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176
state->wreplconn= wreplconn;
179
} else if (!cached_connection) {
180
state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
181
state->wreplconn= NULL;
185
talloc_free(wreplconn);
190
wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
191
if (!wreplconn) goto failed;
193
wreplconn->service = service;
194
wreplconn->partner = partner;
195
wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
196
if (!wreplconn->sock) goto failed;
198
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
199
state->wreplconn= wreplconn;
200
state->c_req = wrepl_connect_send(wreplconn->sock,
201
partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
203
if (!state->c_req) goto failed;
205
state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
206
state->c_req->async.private_data = state;
214
static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
215
struct wreplsrv_out_connection **wreplconn)
219
status = composite_wait(c);
221
if (NT_STATUS_IS_OK(status)) {
222
struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
223
struct wreplsrv_out_connect_state);
224
if (state->wreplconn) {
225
*wreplconn = talloc_reference(mem_ctx, state->wreplconn);
226
if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
228
status = NT_STATUS_INVALID_CONNECTION;
237
struct wreplsrv_pull_table_io {
239
struct wreplsrv_partner *partner;
241
struct wrepl_wins_owner *owners;
245
struct wrepl_wins_owner *owners;
249
enum wreplsrv_pull_table_stage {
250
WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
251
WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
252
WREPLSRV_PULL_TABLE_STAGE_DONE
255
struct wreplsrv_pull_table_state {
256
enum wreplsrv_pull_table_stage stage;
257
struct composite_context *c;
258
struct wrepl_request *req;
259
struct wrepl_pull_table table_io;
260
struct wreplsrv_pull_table_io *io;
261
struct composite_context *creq;
262
struct wreplsrv_out_connection *wreplconn;
265
static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
267
static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
271
status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
272
NT_STATUS_NOT_OK_RETURN(status);
274
state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
275
state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
276
NT_STATUS_HAVE_NO_MEMORY(state->req);
278
state->req->async.fn = wreplsrv_pull_table_handler_req;
279
state->req->async.private_data = state;
281
state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
286
static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
290
status = wrepl_pull_table_recv(state->req, state, &state->table_io);
291
NT_STATUS_NOT_OK_RETURN(status);
293
state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
298
static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
300
struct composite_context *c = state->c;
302
switch (state->stage) {
303
case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
304
c->status = wreplsrv_pull_table_wait_connection(state);
306
case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
307
c->status = wreplsrv_pull_table_wait_table_reply(state);
308
c->state = COMPOSITE_STATE_DONE;
310
case WREPLSRV_PULL_TABLE_STAGE_DONE:
311
c->status = NT_STATUS_INTERNAL_ERROR;
314
if (!NT_STATUS_IS_OK(c->status)) {
315
c->state = COMPOSITE_STATE_ERROR;
318
if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
323
static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
325
struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
326
struct wreplsrv_pull_table_state);
327
wreplsrv_pull_table_handler(state);
331
static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
333
struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private_data,
334
struct wreplsrv_pull_table_state);
335
wreplsrv_pull_table_handler(state);
339
static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
341
struct composite_context *c = NULL;
342
struct wreplsrv_service *service = io->in.partner->service;
343
struct wreplsrv_pull_table_state *state = NULL;
345
c = talloc_zero(mem_ctx, struct composite_context);
348
state = talloc_zero(c, struct wreplsrv_pull_table_state);
349
if (!state) goto failed;
353
c->state = COMPOSITE_STATE_IN_PROGRESS;
354
c->event_ctx = service->task->event_ctx;
355
c->private_data = state;
357
if (io->in.num_owners) {
358
state->table_io.out.num_partners = io->in.num_owners;
359
state->table_io.out.partners = io->in.owners;
360
state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
365
state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
366
state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
367
if (!state->creq) goto failed;
369
state->creq->async.fn = wreplsrv_pull_table_handler_creq;
370
state->creq->async.private_data = state;
378
static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
379
struct wreplsrv_pull_table_io *io)
383
status = composite_wait(c);
385
if (NT_STATUS_IS_OK(status)) {
386
struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
387
struct wreplsrv_pull_table_state);
388
io->out.num_owners = state->table_io.out.num_partners;
389
io->out.owners = talloc_reference(mem_ctx, state->table_io.out.partners);
396
struct wreplsrv_pull_names_io {
398
struct wreplsrv_partner *partner;
399
struct wreplsrv_out_connection *wreplconn;
400
struct wrepl_wins_owner owner;
404
struct wrepl_name *names;
408
enum wreplsrv_pull_names_stage {
409
WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
410
WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
411
WREPLSRV_PULL_NAMES_STAGE_DONE
414
struct wreplsrv_pull_names_state {
415
enum wreplsrv_pull_names_stage stage;
416
struct composite_context *c;
417
struct wrepl_request *req;
418
struct wrepl_pull_names pull_io;
419
struct wreplsrv_pull_names_io *io;
420
struct composite_context *creq;
421
struct wreplsrv_out_connection *wreplconn;
424
static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
426
static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
430
status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
431
NT_STATUS_NOT_OK_RETURN(status);
433
state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
434
state->pull_io.in.partner = state->io->in.owner;
435
state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
436
NT_STATUS_HAVE_NO_MEMORY(state->req);
438
state->req->async.fn = wreplsrv_pull_names_handler_req;
439
state->req->async.private_data = state;
441
state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
446
static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
450
status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
451
NT_STATUS_NOT_OK_RETURN(status);
453
state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
458
static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
460
struct composite_context *c = state->c;
462
switch (state->stage) {
463
case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
464
c->status = wreplsrv_pull_names_wait_connection(state);
466
case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
467
c->status = wreplsrv_pull_names_wait_send_reply(state);
468
c->state = COMPOSITE_STATE_DONE;
470
case WREPLSRV_PULL_NAMES_STAGE_DONE:
471
c->status = NT_STATUS_INTERNAL_ERROR;
474
if (!NT_STATUS_IS_OK(c->status)) {
475
c->state = COMPOSITE_STATE_ERROR;
478
if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
483
static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
485
struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
486
struct wreplsrv_pull_names_state);
487
wreplsrv_pull_names_handler(state);
491
static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
493
struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private_data,
494
struct wreplsrv_pull_names_state);
495
wreplsrv_pull_names_handler(state);
499
static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
501
struct composite_context *c = NULL;
502
struct wreplsrv_service *service = io->in.partner->service;
503
struct wreplsrv_pull_names_state *state = NULL;
504
enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
506
if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
508
c = talloc_zero(mem_ctx, struct composite_context);
511
state = talloc_zero(c, struct wreplsrv_pull_names_state);
512
if (!state) goto failed;
516
c->state = COMPOSITE_STATE_IN_PROGRESS;
517
c->event_ctx = service->task->event_ctx;
518
c->private_data = state;
520
state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
521
state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
522
if (!state->creq) goto failed;
524
state->creq->async.fn = wreplsrv_pull_names_handler_creq;
525
state->creq->async.private_data = state;
533
static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
534
struct wreplsrv_pull_names_io *io)
538
status = composite_wait(c);
540
if (NT_STATUS_IS_OK(status)) {
541
struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
542
struct wreplsrv_pull_names_state);
543
io->out.num_names = state->pull_io.out.num_names;
544
io->out.names = talloc_reference(mem_ctx, state->pull_io.out.names);
552
enum wreplsrv_pull_cycle_stage {
553
WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
554
WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
555
WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
556
WREPLSRV_PULL_CYCLE_STAGE_DONE
559
struct wreplsrv_pull_cycle_state {
560
enum wreplsrv_pull_cycle_stage stage;
561
struct composite_context *c;
562
struct wreplsrv_pull_cycle_io *io;
563
struct wreplsrv_pull_table_io table_io;
565
struct wreplsrv_pull_names_io names_io;
566
struct composite_context *creq;
567
struct wrepl_associate_stop assoc_stop_io;
568
struct wrepl_request *req;
571
static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
572
static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
574
static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
576
struct wreplsrv_owner *current_owner=NULL;
577
struct wreplsrv_owner *local_owner;
579
uint64_t old_max_version = 0;
580
bool do_pull = false;
582
for (i=state->current; i < state->table_io.out.num_owners; i++) {
583
current_owner = wreplsrv_find_owner(state->io->in.partner->service,
584
state->io->in.partner->pull.table,
585
state->table_io.out.owners[i].address);
587
local_owner = wreplsrv_find_owner(state->io->in.partner->service,
588
state->io->in.partner->service->table,
589
state->table_io.out.owners[i].address);
591
* this means we are ourself the current owner,
592
* and we don't want replicate ourself
594
if (!current_owner) continue;
597
* this means we don't have any records of this owner
607
* this means the remote partner has some new records of this owner
610
if (current_owner->owner.max_version > local_owner->owner.max_version) {
612
old_max_version = local_owner->owner.max_version;
619
state->names_io.in.partner = state->io->in.partner;
620
state->names_io.in.wreplconn = state->io->in.wreplconn;
621
state->names_io.in.owner = current_owner->owner;
622
state->names_io.in.owner.min_version = old_max_version + 1;
623
state->creq = wreplsrv_pull_names_send(state, &state->names_io);
624
NT_STATUS_HAVE_NO_MEMORY(state->creq);
626
state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
627
state->creq->async.private_data = state;
629
return STATUS_MORE_ENTRIES;
635
static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
639
status = wreplsrv_pull_cycle_next_owner_do_work(state);
640
if (NT_STATUS_IS_OK(status)) {
641
state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
642
} else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
643
state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
644
status = NT_STATUS_OK;
647
if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
648
state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx;
649
state->assoc_stop_io.in.reason = 0;
650
state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
651
NT_STATUS_HAVE_NO_MEMORY(state->req);
653
state->req->async.fn = wreplsrv_pull_cycle_handler_req;
654
state->req->async.private_data = state;
656
state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
662
static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
667
status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
668
NT_STATUS_NOT_OK_RETURN(status);
670
/* update partner table */
671
for (i=0; i < state->table_io.out.num_owners; i++) {
672
status = wreplsrv_add_table(state->io->in.partner->service,
673
state->io->in.partner,
674
&state->io->in.partner->pull.table,
675
state->table_io.out.owners[i].address,
676
state->table_io.out.owners[i].max_version);
677
NT_STATUS_NOT_OK_RETURN(status);
680
status = wreplsrv_pull_cycle_next_owner_wrapper(state);
681
NT_STATUS_NOT_OK_RETURN(status);
686
static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
690
status = wreplsrv_apply_records(state->io->in.partner,
691
&state->names_io.in.owner,
692
state->names_io.out.num_names,
693
state->names_io.out.names);
694
NT_STATUS_NOT_OK_RETURN(status);
696
talloc_free(state->names_io.out.names);
697
ZERO_STRUCT(state->names_io);
702
static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
706
status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
707
NT_STATUS_NOT_OK_RETURN(status);
710
* TODO: this should maybe an async call,
711
* because we may need some network access
712
* for conflict resolving
714
status = wreplsrv_pull_cycle_apply_records(state);
715
NT_STATUS_NOT_OK_RETURN(status);
717
status = wreplsrv_pull_cycle_next_owner_wrapper(state);
718
NT_STATUS_NOT_OK_RETURN(status);
723
static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
727
status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
728
NT_STATUS_NOT_OK_RETURN(status);
730
state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
735
static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
737
struct composite_context *c = state->c;
739
switch (state->stage) {
740
case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
741
c->status = wreplsrv_pull_cycle_wait_table_reply(state);
743
case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
744
c->status = wreplsrv_pull_cycle_wait_send_replies(state);
746
case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
747
c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
749
case WREPLSRV_PULL_CYCLE_STAGE_DONE:
750
c->status = NT_STATUS_INTERNAL_ERROR;
753
if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
754
c->state = COMPOSITE_STATE_DONE;
757
if (!NT_STATUS_IS_OK(c->status)) {
758
c->state = COMPOSITE_STATE_ERROR;
761
if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
766
static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
768
struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
769
struct wreplsrv_pull_cycle_state);
770
wreplsrv_pull_cycle_handler(state);
774
static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
776
struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private_data,
777
struct wreplsrv_pull_cycle_state);
778
wreplsrv_pull_cycle_handler(state);
782
struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
784
struct composite_context *c = NULL;
785
struct wreplsrv_service *service = io->in.partner->service;
786
struct wreplsrv_pull_cycle_state *state = NULL;
788
c = talloc_zero(mem_ctx, struct composite_context);
791
state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
792
if (!state) goto failed;
796
c->state = COMPOSITE_STATE_IN_PROGRESS;
797
c->event_ctx = service->task->event_ctx;
798
c->private_data = state;
800
state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
801
state->table_io.in.partner = io->in.partner;
802
state->table_io.in.num_owners = io->in.num_owners;
803
state->table_io.in.owners = io->in.owners;
804
state->creq = wreplsrv_pull_table_send(state, &state->table_io);
805
if (!state->creq) goto failed;
807
state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
808
state->creq->async.private_data = state;
816
NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
820
status = composite_wait(c);
826
enum wreplsrv_push_notify_stage {
827
WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
828
WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
829
WREPLSRV_PUSH_NOTIFY_STAGE_DONE
832
struct wreplsrv_push_notify_state {
833
enum wreplsrv_push_notify_stage stage;
834
struct composite_context *c;
835
struct wreplsrv_push_notify_io *io;
836
enum wrepl_replication_cmd command;
838
struct wrepl_send_ctrl ctrl;
839
struct wrepl_request *req;
840
struct wrepl_packet req_packet;
841
struct wrepl_packet *rep_packet;
842
struct composite_context *creq;
843
struct wreplsrv_out_connection *wreplconn;
846
static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
847
static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
849
static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
851
struct wreplsrv_service *service = state->io->in.partner->service;
852
struct wrepl_packet *req = &state->req_packet;
853
struct wrepl_replication *repl_out = &state->req_packet.message.replication;
854
struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
855
struct wreplsrv_in_connection *wrepl_in;
857
struct socket_context *sock;
858
struct packet_context *packet;
861
/* prepare the outgoing request */
862
req->opcode = WREPL_OPCODE_BITS;
863
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
864
req->mess_type = WREPL_REPLICATION;
866
repl_out->command = state->command;
868
status = wreplsrv_fill_wrepl_table(service, state, table_out,
869
service->wins_db->local_owner, state->full_table);
870
NT_STATUS_NOT_OK_RETURN(status);
872
/* queue the request */
873
state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
874
NT_STATUS_HAVE_NO_MEMORY(state->req);
877
* now we need to convert the wrepl_socket (client connection)
878
* into a wreplsrv_in_connection (server connection), because
879
* we'll act as a server on this connection after the WREPL_REPL_UPDATE*
880
* message is received by the peer.
883
/* steal the socket_context */
884
sock = state->wreplconn->sock->sock;
885
state->wreplconn->sock->sock = NULL;
886
talloc_steal(state, sock);
889
* steal the packet_context
890
* note the request DATA_BLOB we just send on the
891
* wrepl_socket (client connection) is still unter the
892
* packet context and will be send to the wire
894
packet = state->wreplconn->sock->packet;
895
state->wreplconn->sock->packet = NULL;
896
talloc_steal(state, packet);
899
* get the fde_flags of the old fde event,
900
* so that we can later set the same flags to the new one
902
fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
905
* free the wrepl_socket (client connection)
907
talloc_free(state->wreplconn->sock);
908
state->wreplconn->sock = NULL;
911
* now create a wreplsrv_in_connection,
912
* on which we act as server
914
* NOTE: sock and packet will be stolen by
915
* wreplsrv_in_connection_merge()
917
status = wreplsrv_in_connection_merge(state->io->in.partner,
918
sock, packet, &wrepl_in);
919
NT_STATUS_NOT_OK_RETURN(status);
921
event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
923
wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
924
wrepl_in->assoc_ctx.our_ctx = 0;
926
/* now we can free the wreplsrv_out_connection */
927
talloc_free(state->wreplconn);
928
state->wreplconn = NULL;
930
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
935
static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
937
struct wreplsrv_service *service = state->io->in.partner->service;
938
struct wrepl_packet *req = &state->req_packet;
939
struct wrepl_replication *repl_out = &state->req_packet.message.replication;
940
struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
943
req->opcode = WREPL_OPCODE_BITS;
944
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
945
req->mess_type = WREPL_REPLICATION;
947
repl_out->command = state->command;
949
status = wreplsrv_fill_wrepl_table(service, state, table_out,
950
service->wins_db->local_owner, state->full_table);
951
NT_STATUS_NOT_OK_RETURN(status);
953
/* we won't get a reply to a inform message */
954
state->ctrl.send_only = true;
956
state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
957
NT_STATUS_HAVE_NO_MEMORY(state->req);
959
state->req->async.fn = wreplsrv_push_notify_handler_req;
960
state->req->async.private_data = state;
962
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
967
static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
971
status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
972
NT_STATUS_NOT_OK_RETURN(status);
974
/* is the peer doesn't support inform fallback to update */
975
switch (state->command) {
976
case WREPL_REPL_INFORM:
977
if (state->wreplconn->assoc_ctx.peer_major < 5) {
978
state->command = WREPL_REPL_UPDATE;
981
case WREPL_REPL_INFORM2:
982
if (state->wreplconn->assoc_ctx.peer_major < 5) {
983
state->command = WREPL_REPL_UPDATE2;
990
switch (state->command) {
991
case WREPL_REPL_UPDATE:
992
state->full_table = true;
993
return wreplsrv_push_notify_update(state);
994
case WREPL_REPL_UPDATE2:
995
state->full_table = false;
996
return wreplsrv_push_notify_update(state);
997
case WREPL_REPL_INFORM:
998
state->full_table = true;
999
return wreplsrv_push_notify_inform(state);
1000
case WREPL_REPL_INFORM2:
1001
state->full_table = false;
1002
return wreplsrv_push_notify_inform(state);
1004
return NT_STATUS_INTERNAL_ERROR;
1007
return NT_STATUS_INTERNAL_ERROR;
1010
static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1014
status = wrepl_request_recv(state->req, state, NULL);
1015
NT_STATUS_NOT_OK_RETURN(status);
1017
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1021
static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1023
struct composite_context *c = state->c;
1025
switch (state->stage) {
1026
case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1027
c->status = wreplsrv_push_notify_wait_connect(state);
1029
case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1030
c->status = wreplsrv_push_notify_wait_inform(state);
1032
case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1033
c->status = NT_STATUS_INTERNAL_ERROR;
1036
if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1037
c->state = COMPOSITE_STATE_DONE;
1040
if (!NT_STATUS_IS_OK(c->status)) {
1041
c->state = COMPOSITE_STATE_ERROR;
1044
if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1049
static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1051
struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1052
struct wreplsrv_push_notify_state);
1053
wreplsrv_push_notify_handler(state);
1057
static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1059
struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
1060
struct wreplsrv_push_notify_state);
1061
wreplsrv_push_notify_handler(state);
1065
struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1067
struct composite_context *c = NULL;
1068
struct wreplsrv_service *service = io->in.partner->service;
1069
struct wreplsrv_push_notify_state *state = NULL;
1070
enum winsrepl_partner_type partner_type;
1072
c = talloc_zero(mem_ctx, struct composite_context);
1073
if (!c) goto failed;
1075
state = talloc_zero(c, struct wreplsrv_push_notify_state);
1076
if (!state) goto failed;
1080
if (io->in.inform) {
1081
/* we can cache the connection in partner->push->wreplconn */
1082
partner_type = WINSREPL_PARTNER_PUSH;
1083
if (io->in.propagate) {
1084
state->command = WREPL_REPL_INFORM2;
1086
state->command = WREPL_REPL_INFORM;
1089
/* we can NOT cache the connection */
1090
partner_type = WINSREPL_PARTNER_NONE;
1091
if (io->in.propagate) {
1092
state->command = WREPL_REPL_UPDATE2;
1094
state->command = WREPL_REPL_UPDATE;
1098
c->state = COMPOSITE_STATE_IN_PROGRESS;
1099
c->event_ctx = service->task->event_ctx;
1100
c->private_data = state;
1102
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1103
state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1104
if (!state->creq) goto failed;
1106
state->creq->async.fn = wreplsrv_push_notify_handler_creq;
1107
state->creq->async.private_data = state;
1115
NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1119
status = composite_wait(c);