1
// DistributionSocketLink.cc
3
// Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2010, 2011 Rob Caelers <robc@krandor.org>
4
// All rights reserved.
6
// This program is free software: you can redistribute it and/or modify
7
// it under the terms of the GNU General Public License as published by
8
// the Free Software Foundation, either version 3 of the License, or
9
// (at your option) any later version.
11
// This program is distributed in the hope that it will be useful,
12
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
// GNU General Public License for more details.
16
// You should have received a copy of the GNU General Public License
17
// along with this program. If not, see <http://www.gnu.org/licenses/>.
24
#ifdef HAVE_DISTRIBUTION
40
#include "Configurator.hh"
41
#include "CoreConfig.hh"
43
#include "DistributionManager.hh"
44
#include "DistributionLink.hh"
45
#include "DistributionSocketLink.hh"
51
//! Construct a new socket link.
53
* \param conf Configurator to use.
55
DistributionSocketLink::DistributionSocketLink(Configurator *conf) :
63
server_port(DEFAULT_PORT),
65
network_enabled(false),
66
server_enabled(false),
67
reconnect_attempts(DEFAULT_ATTEMPTS),
68
reconnect_interval(DEFAULT_INTERVAL),
71
socket_driver = SocketDriver::create();
76
//! Destructs the socket link.
77
DistributionSocketLink::~DistributionSocketLink()
88
//! Initialize the link.
90
DistributionSocketLink::init()
92
TRACE_ENTER("DistributionSocketLink::init");
98
// Read all tcp link configuration.
100
configurator->add_listener(CoreConfig::CFG_KEY_DISTRIBUTION_TCP, this);
106
//! Periodic heartbeat.
108
DistributionSocketLink::heartbeat()
114
time_t current_time = time(NULL);
116
// See if we have some clients that need reconncting.
117
list<Client *>::iterator i = clients.begin();
118
while (i != clients.end())
121
if (c->type == CLIENTTYPE_DIRECT &&
122
c->reconnect_count > 0 &&
123
c->reconnect_time != 0 && current_time >= c->reconnect_time &&
126
c->reconnect_count--;
127
c->reconnect_time = 0;
129
dist_manager->log(_("Reconnecting to %s."),
130
c->id == NULL ? "Unknown" : c->id);
132
if (c->socket != NULL)
138
ISocket *socket = socket_driver->create_socket();
140
socket->set_listener(this);
141
socket->connect(c->hostname, c->port);
148
// Periodically distribute state, in case the master crashes.
149
if (heartbeat_count % 30 == 0 && i_am_master)
151
send_client_message(DCMT_MASTER);
157
//! Initializes the network wrapper.
159
DistributionSocketLink::init_my_id()
161
TRACE_ENTER("DistributionSocketLink::init_my_id");
163
string idfilename = Util::get_home_directory() + "id";
165
if (Util::file_exists(idfilename))
167
ifstream file(idfilename.c_str());
174
if (id_str.length() == WRID::STR_LENGTH)
176
ok = my_id.set(id_str);
184
ofstream file(idfilename.c_str());
186
file << my_id.str() << endl;
195
//! Returns the id of this node.
197
DistributionSocketLink::get_my_id() const
203
//! Returns the total number of peer in the network.
205
DistributionSocketLink::get_number_of_peers()
209
list<Client *>::iterator i = clients.begin();
210
while (i != clients.end())
213
if (c->socket != NULL)
224
//! Join the WR network.
226
DistributionSocketLink::connect(string url)
230
std::string::size_type pos = url.find("://");
231
std::string hostport;
233
if (pos == std::string::npos)
239
hostport = url.substr(pos + 3);
242
pos = hostport.rfind(":");
244
std::string port = "0";
246
if (pos == std::string::npos)
252
host = hostport.substr(0, pos);
253
port = hostport.substr(pos + 1);
256
add_client(NULL, (gchar *)host.c_str(), atoi(port.c_str()), CLIENTTYPE_DIRECT);
261
//! Disconnects the specified client.
263
DistributionSocketLink::disconnect(string id)
265
TRACE_ENTER_MSG("DistributionSocketLink::disconnect", id);
266
Client *c = find_client_by_id((gchar*)id.c_str());
276
//! Disconnects all clients.
278
DistributionSocketLink::disconnect_all()
280
list<Client *>::iterator i = clients.begin();
283
master_client = NULL;
285
while (i != clients.end())
298
//! Reconnects all clients.
300
DistributionSocketLink::reconnect_all()
302
list<Client *>::iterator i = clients.begin();
305
while (i != clients.end())
307
if ((*i)->type == CLIENTTYPE_DIRECT)
309
close_client(*i, true);
319
//! Attempt to become the master node.
321
* \return true if the claim was successfull.
324
DistributionSocketLink::claim()
326
TRACE_ENTER("DistributionSocketLink::claim");
329
if (master_client != NULL)
331
// Another client is master. Politely request to become
333
send_claim(master_client);
336
else if (!i_am_master && clients.size() > 0)
338
// No one is master. Just force to be master
339
// potential problem when more client do this simultaneously...
345
// No one master, no other clients. Be happy.
353
//! Lock the master status. Claim will be denied when locked.
355
DistributionSocketLink::set_lock_master(bool lock)
357
master_locked = lock;
362
//! Sets the username and password.
364
DistributionSocketLink::set_user(string user, string pw)
369
username = g_strdup(user.c_str());
370
password = g_strdup(pw.c_str());
374
//! Sets the distribution manager for callbacks.
376
DistributionSocketLink::set_distribution_manager(DistributionManager *dll)
382
//! Enable/disable connecting to distributed operation.
384
DistributionSocketLink::set_network_enabled(bool enabled)
386
if (network_enabled && !enabled)
388
network_enabled = enabled;
389
set_server_enabled(false);
392
else if (!network_enabled && enabled)
394
network_enabled = enabled;
396
set_server_enabled(server_enabled);
399
return network_enabled;
403
//! Enable/disable listening for distributed operation.
405
DistributionSocketLink::set_server_enabled(bool enabled)
407
TRACE_ENTER_MSG("DistributionSocketLink:set_server_enabled", enabled);
408
bool ret = server_enabled;
410
if (!network_enabled) {
411
// Don't start the server if the network is not enabled
415
if (server_socket == NULL && enabled)
417
// Switching from disabled to enabled;
418
if (!start_async_server())
420
// We did not succeed in starting the server. Arghh.
421
dist_manager->log(_("Could not enable network operation."));
425
else if (server_socket != NULL && !enabled)
427
// Switching from enabled to disabled.
428
if (server_socket != NULL)
430
dist_manager->log(_("Disabling network operation."));
432
delete server_socket;
434
server_socket = NULL;
438
server_enabled = enabled;
444
//! Register a distributed client message callback.
446
DistributionSocketLink::register_client_message(DistributionClientMessageID id,
447
DistributionClientMessageType type,
448
IDistributionClientMessage *callback)
450
ClientMessageListener sl;
451
sl.listener = callback;
454
client_message_map[id] = sl;
460
//! Unregister a distributed clien _message callback.
462
DistributionSocketLink::unregister_client_message(DistributionClientMessageID id)
469
//! Force is state distribution.
471
DistributionSocketLink::broadcast_client_message(DistributionClientMessageID dsid,
472
PacketBuffer &buffer)
474
TRACE_ENTER("DistributionSocketLink::broadcast_client_message");
478
init_packet(packet, PACKET_CLIENTMSG);
480
string id = get_master();
481
packet.pack_string(id);
483
packet.pack_ushort(1);
484
packet.pack_ushort(dsid);
485
packet.pack_ushort(buffer.bytes_written());
486
packet.pack_raw((unsigned char *)buffer.get_buffer(),
487
buffer.bytes_written());
489
send_packet_broadcast(packet);
495
//! Returns whether the specified client is this client.
497
DistributionSocketLink::client_is_me(gchar *id)
499
return id != NULL && strcmp(id, my_id.str().c_str()) == 0;
504
//! Returns whether the specified client exists.
506
* This method checks if the specified client is an existing remote
507
* client or the local client.
510
DistributionSocketLink::exists_client(gchar *id)
512
TRACE_ENTER_MSG("DistributionSocketLink::exists_client", id);
514
bool ret = client_is_me(id);
518
Client *c = find_client_by_id(id);
527
//! Adds a new client and connect to it.
529
DistributionSocketLink::add_client(gchar *id, gchar *host, gint port, ClientType type, Client *peer)
531
TRACE_ENTER_MSG("DistributionSocketLink::add_client",
532
(id != NULL ? id : "NULL") << " " <<
533
(host != NULL ? host : "NULL") << " " << port);
535
gchar *canonical_host = NULL;
537
Client *c = find_client_by_canonicalname(host, port);
538
if (c != NULL && c->type == CLIENTTYPE_SIGNEDOFF && type == CLIENTTYPE_DIRECT)
542
dist_manager->signon_remote_client(c->id);
546
dist_manager->log(_("Connecting to %s."), host);
548
if (c->socket != NULL)
554
ISocket *socket = socket_driver->create_socket();
556
socket->set_listener(this);
557
socket->connect(host, port);
563
// Client does not yet exists as far as we can see.
564
// So, create a new one.
565
Client *client = new Client;
569
client->packet.create();
570
client->hostname = g_strdup(host);
571
client->id = g_strdup(id);
574
clients.push_back(client);
576
if (client->id != NULL)
578
dist_manager->signon_remote_client(client->id);
581
if (type == CLIENTTYPE_DIRECT)
583
dist_manager->log(_("Connecting to %s."), host);
585
if (client->socket != NULL)
587
client->socket->close();
588
delete client->socket;
591
ISocket *socket = socket_driver->create_socket();
592
socket->set_data(client);
593
socket->set_listener(this);
594
socket->connect(host, port);
596
client->socket = socket;
599
g_free(canonical_host);
605
//! Sets the id of a client.
607
* This method also checks for duplicates.
609
* \return true if the client is a duplicate. The id will not
610
* changed if the client is a duplicate.
613
DistributionSocketLink::set_client_id(Client *client, gchar *id)
615
TRACE_ENTER_MSG("DistributionSocketLink::set_id", id);
618
bool exists = exists_client(id);
621
// Already have a client with this name/port
622
Client *old_client = find_client_by_id(id);
624
if (old_client == NULL)
627
TRACE_MSG("It'me me");
630
else if (old_client != client)
632
TRACE_MSG("It's not me " << old_client->type << " " << old_client->socket);
633
// It's a remote client, but not the same one.
635
bool reuse = ((old_client->type == CLIENTTYPE_DIRECT ||
636
old_client->type == CLIENTTYPE_SIGNEDOFF)
637
&& old_client->socket == NULL);
639
TRACE_MSG("reuse " << reuse);
642
// Client exist, but is not connected.
643
// Silently remove the old client.
644
remove_client(old_client);
648
// Already connected to this client.
655
// it's ok. It's same one.
661
// No duplicate, so change the canonical name.
663
g_free(client->hostname);
664
client->id = g_strdup(id);
665
client->hostname = NULL;
668
if (client->id != NULL)
670
dist_manager->signon_remote_client(client->id);
679
//! Removes a client (or all clients)
681
* Network connections to removed client are closed.
683
* \param client client to remove, or NULL if all clients to be removed.
687
DistributionSocketLink::remove_client(Client *client)
689
list<Client *>::iterator i = clients.begin();
691
if (client == master_client)
693
// Client to be removed is master. Unset master client.
694
master_client = NULL;
697
while (i != clients.end())
699
if (client == NULL || *i == client || (*i)->peer == client)
701
dist_manager->log(_("Removing client %s."),
702
(*i)->id == NULL ? "Unknown" : (*i)->id);
704
i = clients.erase(i);
714
//! Removes all peers of the specified client.
716
DistributionSocketLink::remove_peer_clients(Client *client)
718
TRACE_ENTER("DistributionSocketLink::remove_peer_clients");
720
list<Client *>::iterator i = clients.begin();
721
while (i != clients.end())
723
if ((*i)->peer == client)
725
TRACE_MSG("Client " << (*i)->peer->id << " is peer of " << client->id);
727
dist_manager->log(_("Removing client %s."),
728
(*i)->id == NULL ? "Unknown" : (*i)->id);
729
send_signoff(NULL, *i);
731
if (client == master_client)
733
// Connection to master is lost. Unset master client.
738
i = clients.erase(i);
749
//! Closes the connection to a client.
751
DistributionSocketLink::close_client(Client *client, bool reconnect /* = false*/)
753
TRACE_ENTER_MSG("DistributionSocketLink::close_client",
754
(client->id != NULL ? client->id : "Unknown") << " " << reconnect);
756
if (client == master_client)
758
// Client to be closed is master. Unset master client.
762
if (client->type == CLIENTTYPE_DIRECT)
764
TRACE_MSG("Is direct");
765
// Closing direct connection.
766
dist_manager->log(_("Disconnecting %s"),
767
client->id != NULL ? client->id : "Unknown");
769
// Inform the client that we are disconnecting.
770
send_signoff(client, NULL);
772
if (client->socket != NULL)
774
TRACE_MSG("Still connected");
775
// Still connected. Disconect.
776
delete client->socket;
777
client->socket = NULL;
781
TRACE_MSG("must reconnected");
782
client->reconnect_count = reconnect_attempts;
783
client->reconnect_time = time(NULL) + 5;
787
TRACE_MSG("set signed off");
788
client->reconnect_count = 0;
789
client->reconnect_time = 0;
790
client->type = CLIENTTYPE_SIGNEDOFF;
794
send_signoff(NULL, client);
795
remove_peer_clients(client);
797
else if (client->type == CLIENTTYPE_SIGNEDOFF)
799
TRACE_MSG("is signed off");
800
if (client->socket != NULL)
802
TRACE_MSG("still connected");
804
// Still connected. Disconect.
805
delete client->socket;
806
client->socket = NULL;
808
client->reconnect_count = 0;
809
client->reconnect_time = 0;
812
remove_peer_clients(client);
817
//! Check if a client point is stil valid....
819
DistributionSocketLink::is_client_valid(Client *client)
821
list<Client *>::iterator i = clients.begin();
824
while (!ret && i != clients.end())
837
//! Finds a remote client by its canonical name and port.
838
DistributionSocketLink::Client *
839
DistributionSocketLink::find_client_by_canonicalname(gchar *name, gint port)
842
list<Client *>::iterator i = clients.begin();
844
while (i != clients.end())
846
if ((*i)->port == port && (*i)->hostname != NULL && strcmp((*i)->hostname, name) == 0)
855
//! Finds a remote client by its id.
856
DistributionSocketLink::Client *
857
DistributionSocketLink::find_client_by_id(gchar *id)
860
list<Client *>::iterator i = clients.begin();
862
while (i != clients.end())
864
if ((*i)->id != NULL && strcmp((*i)->id, id) == 0)
874
//! Returns the master client.
876
DistributionSocketLink::get_master() const
884
else if (master_client != NULL && master_client->id != NULL)
886
id = master_client->id;
892
//! Sets the specified remote client as master.
894
DistributionSocketLink::set_master(Client *client)
896
TRACE_ENTER("DistributionSocketLink::set_master")
897
master_client = client;
900
if (dist_manager != NULL)
902
dist_manager->master_changed(false, client != NULL ? client->id : "");
908
//! Sets the local client as master.
910
DistributionSocketLink::set_me_master()
912
TRACE_ENTER("DistributionSocketLink::set_me_master");
913
master_client = NULL;
916
if (dist_manager != NULL)
918
dist_manager->master_changed(true, get_my_id());
924
//! Sets the specified client master.
926
DistributionSocketLink::set_master_by_id(gchar *id)
928
TRACE_ENTER_MSG("DistributionSocketLink::set_master", id);
930
Client *c = find_client_by_id(id);
934
// It's a remote client. mark it master.
935
dist_manager->log(_("Client %s is now master."),
936
c->id == NULL ? "Unknown" : c->id);
939
else if (strcmp(id, get_my_id().c_str()) == 0)
942
dist_manager->log(_("I'm now master."));
955
//! Initialize an outgoing packet.
957
DistributionSocketLink::init_packet(PacketBuffer &packet, PacketCommand cmd)
960
packet.pack_ushort(0);
966
packet.pack_ushort(cmd);
970
//! Sends the specified packet to all clients.
972
DistributionSocketLink::send_packet_broadcast(PacketBuffer &packet)
974
TRACE_ENTER("DistributionSocketLink::send_packet_broadcast");
976
send_packet_except(packet, NULL);
982
//! Sends the specified packet to all clients with the exception of one client.
984
DistributionSocketLink::send_packet_except(PacketBuffer &packet, Client *client)
986
TRACE_ENTER("DistributionSocketLink::send_packet_except");
988
gint size = packet.bytes_written();
991
packet.poke_ushort(0, size);
993
list<Client *>::iterator i = clients.begin();
994
while (i != clients.end())
998
if (c != client && c->socket != NULL)
1000
int bytes_written = 0;
1001
c->socket->write(packet.get_buffer(), size, bytes_written);
1010
//! Sends the specified packet to the specified client.
1012
DistributionSocketLink::send_packet(Client *client, PacketBuffer &packet)
1014
TRACE_ENTER("DistributionSocketLink::send_packet");
1016
if (client != NULL && client->type == CLIENTTYPE_ROUTED)
1018
TRACE_MSG("Must route packet.");
1020
if (client->id == NULL)
1022
TRACE_MSG("Client's ID == NULL");
1025
packet.restart_read();
1026
int flags = packet.peek_byte(3);
1028
if (!(flags & PACKETFLAG_DEST) && client->id != NULL)
1030
assert(!(flags & PACKETFLAG_SOURCE));
1032
TRACE_MSG("Add destination " << client->id);
1034
packet.poke_byte(3, flags | PACKETFLAG_DEST);
1036
packet.insert(4, strlen(client->id) + 2);
1037
packet.poke_string(6, client->id);
1039
client = client->peer;
1042
if (client != NULL && client->socket != NULL)
1044
if (client->id != NULL)
1046
TRACE_MSG("Sending to " << client->id);
1049
gint size = packet.bytes_written();
1052
packet.poke_ushort(0, size);
1054
int bytes_written = 0;
1055
client->socket->write(packet.get_buffer(), size, bytes_written);
1062
//! Processed an incoming packet.
1064
DistributionSocketLink::process_client_packet(Client *client)
1066
TRACE_ENTER("DistributionSocketLink::process_client_packet");
1067
PacketBuffer &packet = client->packet;
1069
client->claim_count = 0;
1071
gint size = packet.unpack_ushort();
1072
g_assert(size == packet.bytes_written());
1074
gint version = packet.unpack_byte();
1075
gint flags = packet.unpack_byte();
1077
gint type = packet.unpack_ushort();
1078
TRACE_MSG("type = " << type);
1079
if (client != NULL && client->id != NULL)
1081
TRACE_MSG("From = " << client->id);
1084
Client *source = client;
1085
bool forward = true;
1086
if (flags & PACKETFLAG_SOURCE)
1088
gchar *id = packet.unpack_string();
1090
if (!client_is_me(id))
1092
TRACE_MSG("routed, source = " << id);
1094
source = find_client_by_id(id);
1098
TRACE_MSG("Unknown source. Dropping");
1100
else if (source != client && source->peer != client)
1102
TRACE_MSG("Illegal source in routing.");
1108
TRACE_MSG("Cycle detected.");
1113
// Duplicate client. inform client that it's bogus and close.
1114
// dist_manager->log(_("Client %s:%d is duplicate."), client->hostname, client->port);
1116
// send_duplicate(client);
1117
// remove_client(client);
1122
if (flags & PACKETFLAG_DEST)
1124
gchar *id = packet.unpack_string();
1126
if (id != NULL && !client_is_me(id))
1128
TRACE_MSG("Destination = " << id);
1129
Client *dest = find_client_by_id(id);
1133
TRACE_MSG("Forwarding to destination");
1134
forward_packet(packet, dest, source);
1142
TRACE_MSG("size = " << size << ", version = " << version << ", flags = " << flags);
1144
if (source != NULL || type == PACKET_CLIENT_LIST)
1149
handle_hello(packet, source);
1153
case PACKET_SIGNOFF:
1154
handle_signoff(packet, source);
1158
handle_claim(packet, source);
1161
case PACKET_WELCOME:
1162
handle_welcome(packet, source);
1166
case PACKET_CLIENT_LIST:
1167
forward = handle_client_list(packet, source, client);
1170
case PACKET_NEW_MASTER:
1171
handle_new_master(packet, source);
1174
case PACKET_CLIENTMSG:
1175
handle_client_message(packet, source);
1178
case PACKET_DUPLICATE:
1179
handle_duplicate(packet, source);
1183
case PACKET_CLAIM_REJECT:
1184
handle_claim_reject(packet, source);
1190
forward_packet_except(packet, client, source);
1201
DistributionSocketLink::forward_packet_except(PacketBuffer &packet, Client *client, Client *source)
1203
TRACE_ENTER("DistributionSocketLink::forward_packet_except");
1205
packet.restart_read();
1206
int flags = packet.peek_byte(3);
1207
if (!(flags & PACKETFLAG_SOURCE) && source->id != NULL)
1209
TRACE_MSG("Add source " << source->id);
1210
packet.poke_byte(3, flags | PACKETFLAG_SOURCE);
1211
packet.insert(4, strlen(source->id) + 2);
1212
packet.poke_string(6, source->id);
1214
send_packet_except(packet, client);
1221
DistributionSocketLink::forward_packet(PacketBuffer &packet, Client *dest, Client *source)
1223
TRACE_ENTER("DistributionSocketLink::forward_packet");
1225
packet.restart_read();
1226
int flags = packet.peek_byte(3);
1227
if (!(flags & PACKETFLAG_SOURCE) && source->id != NULL)
1229
TRACE_MSG("Add source " << source->id);
1230
packet.poke_byte(3, flags | PACKETFLAG_SOURCE);
1231
packet.insert(4, strlen(source->id) + 2);
1232
packet.poke_string(6, source->id);
1234
send_packet(dest, packet);
1239
//! Sends a hello to the specified client.
1241
DistributionSocketLink::send_hello(Client *client)
1243
TRACE_ENTER("DistributionSocketLink::send_hello");
1245
PacketBuffer packet;
1248
init_packet(packet, PACKET_HELLO);
1250
packet.pack_string(username);
1251
packet.pack_string(password);
1252
packet.pack_string(get_my_id());
1253
packet.pack_string(get_my_id()); // was: hostname
1254
packet.pack_ushort(server_port);
1256
send_packet(client, packet);
1261
//! Handles a Hello from the specified client.
1263
DistributionSocketLink::handle_hello(PacketBuffer &packet, Client *client)
1265
TRACE_ENTER("DistributionSocketLink::handle_hello");
1267
gchar *user = packet.unpack_string();
1268
gchar *pass = packet.unpack_string();
1269
gchar *id = packet.unpack_string();
1270
/* gchar *name = */ packet.unpack_string();
1271
/* int port = */ packet.unpack_ushort();
1273
dist_manager->log(_("Client %s saying hello."), id != NULL ? id : "Unknown");
1275
if ( (username == NULL || (user != NULL && strcmp(username, user) == 0)) &&
1276
(password == NULL || (pass != NULL && strcmp(password, pass) == 0)))
1278
bool ok = set_client_id(client, id);
1283
send_welcome(client);
1287
// Duplicate client. inform client that it's bogus and close.
1288
dist_manager->log(_("Client %s is duplicate."),
1289
id != NULL ? id : "Unknown");
1291
send_duplicate(client);
1292
remove_client(client);
1297
// Incorrect password.
1298
dist_manager->log(_("Client %s access denied."),
1299
id != NULL ? id : "Unknown");
1300
remove_client(client);
1312
//! Sends a hello to the specified client.
1314
DistributionSocketLink::send_signoff(Client *to, Client *signedoff_client)
1316
TRACE_ENTER("DistributionSocketLink::send_signoff");
1318
PacketBuffer packet;
1321
init_packet(packet, PACKET_SIGNOFF);
1323
if (signedoff_client != NULL)
1325
TRACE_MSG("remote client " << (signedoff_client->id != NULL ? signedoff_client->id : "?"));
1326
packet.pack_string(signedoff_client->id);
1330
TRACE_MSG("me " << my_id.str());
1331
packet.pack_string(get_my_id());
1336
TRACE_MSG("sending to " << (to->id != NULL ? to->id : "?"));
1337
send_packet(to, packet);
1341
TRACE_MSG("broadcasting");
1342
send_packet_broadcast(packet);
1348
//! Handles a Hello from the specified client.
1350
DistributionSocketLink::handle_signoff(PacketBuffer &packet, Client *client)
1352
TRACE_ENTER("DistributionSocketLink::handle_signoff");
1356
gchar *id = packet.unpack_string();
1361
c = find_client_by_id(id);
1367
dist_manager->log(_("Client %s signed off."),
1368
c->id == NULL ? "Unknown" : c->id);
1370
if (c->type == CLIENTTYPE_DIRECT)
1372
TRACE_MSG("Direct connection. setting signedoff");
1373
c->type = CLIENTTYPE_SIGNEDOFF;
1374
remove_peer_clients(c);
1376
if (c->socket != NULL)
1378
TRACE_MSG("Remove connection");
1387
TRACE_MSG("Routed connection. removing");
1396
//! Sends a duplicate to the specified client.
1398
DistributionSocketLink::send_duplicate(Client *client)
1400
TRACE_ENTER("DistributionSocketLink::send_duplicate");
1402
PacketBuffer packet;
1405
init_packet(packet, PACKET_DUPLICATE);
1407
send_packet(client, packet);
1412
//! Handles a duplicate for the specified client.
1414
DistributionSocketLink::handle_duplicate(PacketBuffer &packet, Client *client)
1417
TRACE_ENTER("DistributionSocketLink::handle_duplicate");
1418
dist_manager->log(_("Client %s is duplicate."),
1419
client->id == NULL ? "Unknown" : client->id);
1420
remove_client(client);
1426
//! Sends a welcome message to the specified client
1428
DistributionSocketLink::send_welcome(Client *client)
1430
TRACE_ENTER("DistributionSocketLink::send_welcome");
1432
PacketBuffer packet;
1435
init_packet(packet, PACKET_WELCOME);
1438
packet.pack_string(get_my_id());
1439
packet.pack_string(get_my_id()); // was: hostname
1440
packet.pack_ushort(server_port);
1442
send_packet(client, packet);
1447
//! Handles a welcome message from the specified client.
1449
DistributionSocketLink::handle_welcome(PacketBuffer &packet, Client *client)
1451
TRACE_ENTER("DistributionSocketLink::handle_welcome");
1453
gchar *id = packet.unpack_string();
1454
gchar *name = packet.unpack_string();
1455
/*gint port = */ packet.unpack_ushort();
1457
dist_manager->log(_("Client %s is welcoming us."),
1458
id == NULL ? "Unknown" : id);
1460
bool ok = set_client_id(client, id);
1464
// The connected client offers the master client.
1465
// This info will be received in the client list.
1466
// So, we no longer know who's master...
1469
// All, ok. Send list of known client.
1470
// WITHOUT info about who's master on out side.
1471
send_client_list(client);
1476
send_duplicate(client);
1477
remove_client(client);
1487
//! Sends the list of known clients to the specified client.
1489
DistributionSocketLink::send_client_list(Client *client, bool except)
1491
TRACE_ENTER("DistributionSocketLink::send_client_list");
1493
if (clients.size() > 0)
1495
PacketBuffer packet;
1497
init_packet(packet, PACKET_CLIENT_LIST);
1500
gint clients_pos = packet.bytes_written();
1502
packet.pack_ushort(0); // number of clients in the list
1503
packet.pack_ushort(0); // flags.
1505
// Put muself in list.
1506
gint pos = packet.bytes_written();
1508
TRACE_MSG("client me: " << my_id.str() << " " << server_port << " " << i_am_master);
1509
int flags = CLIENTLIST_ME | (i_am_master ? CLIENTLIST_MASTER : 0);
1510
packet.pack_ushort(0); // Length
1511
packet.pack_ushort(flags); // Flags
1512
packet.pack_string(get_my_id()); // ID
1513
packet.pack_string(get_my_id()); // Canonical name
1514
packet.pack_ushort(server_port); // Listen port.
1516
// Size of the client data.
1517
packet.poke_ushort(pos, packet.bytes_written() - pos);
1519
// Put known client in the list.
1520
list<Client *>::iterator i = clients.begin();
1521
while (i != clients.end())
1528
pos = packet.bytes_written();
1531
if (c == master_client)
1533
flags |= CLIENTLIST_MASTER;
1536
TRACE_MSG("Send client: " << c->id);
1538
packet.pack_ushort(0); // Length
1539
packet.pack_ushort(flags); // Flags
1540
packet.pack_string(c->id); // ID
1541
packet.pack_string(c->hostname); // Canonical name
1542
packet.pack_ushort(c->port); // Listen port.
1544
// Size of the client data.
1545
packet.poke_ushort(pos, packet.bytes_written() - pos);
1550
// Put packet size in the packet and send.
1551
packet.poke_ushort(clients_pos, count);
1555
send_packet_except(packet, client);
1559
send_packet(client, packet);
1567
//! Handles a client list from the specified client.
1569
DistributionSocketLink::handle_client_list(PacketBuffer &packet, Client *client, Client *direct)
1571
TRACE_ENTER("DistributionSocketLink::handle_client_list");
1574
gint num_clients = packet.unpack_ushort();
1575
gint flags = packet.unpack_ushort();
1578
gchar *master_id = NULL;
1580
gchar **names = new gchar*[num_clients];
1581
gchar **ids = new gchar*[num_clients];
1582
gint *ports = new gint[num_clients];
1586
// Loop over remote clients.
1587
for (int i = 0; i < num_clients; i++)
1594
gint pos = packet.bytes_read();
1595
gint size = packet.unpack_ushort();
1596
gint flags = packet.unpack_ushort();
1597
gchar *id = packet.unpack_string();
1598
gchar *name = packet.unpack_string();
1599
gint port = packet.unpack_ushort();
1601
if (flags & CLIENTLIST_MASTER)
1603
master_id = g_strdup(id);
1604
TRACE_MSG("Master: " << master_id);
1609
if (!exists_client(id))
1612
TRACE_MSG("new client: " << id);
1617
else if (client != NULL && direct == client && !client_is_me(id) && strcmp(client->id, id) != 0)
1619
TRACE_MSG("Strange client: " << id);
1624
// Skip trailing junk...
1625
size -= (packet.bytes_read() - pos);
1635
// And send the list of client we are connected to.
1636
if (client != NULL && direct == client && !client->sent_client_list)
1638
client->sent_client_list = true;
1639
send_client_list(client);
1642
TRACE_MSG("Adding: ");
1643
for (int i = 0; i < num_clients; i++)
1645
if (ids[i] != NULL && names[i] != NULL)
1647
add_client(ids[i], names[i], ports[i], CLIENTTYPE_ROUTED, direct);
1651
if (master_id != NULL)
1653
set_master_by_id(master_id);
1654
TRACE_MSG(master_id << " is now master");
1657
send_client_message(DCMT_SIGNON);
1662
dist_manager->log(_("Client %s is duplicate."),
1663
client->id != NULL ? client->id : "Unknown");
1665
send_duplicate(client);
1666
remove_client(client);
1672
for (int i = 0; i < num_clients; i++)
1688
//! Requests to become master.
1690
DistributionSocketLink::send_claim(Client *client)
1692
TRACE_ENTER("DistributionSocketLink::send_claim");
1694
if (client->next_claim_time == 0 || time(NULL) >= client->next_claim_time)
1696
PacketBuffer packet;
1698
dist_manager->log(_("Requesting master status from %s."),
1699
client->id == NULL ? "Unknown" : client->id);
1702
init_packet(packet, PACKET_CLAIM);
1704
packet.pack_ushort(0);
1706
client->next_claim_time = time(NULL) + 10;
1708
send_packet(client, packet);
1710
if (client->claim_count >= 3)
1712
dist_manager->log(_("Client timeout from %s."),
1713
client->id == NULL ? "Unknown" : client->id);
1715
close_client(client, client->outbound);
1717
client->claim_count++;
1724
//! Handles a request from a remote client to become master.
1726
DistributionSocketLink::handle_claim(PacketBuffer &packet, Client *client)
1728
TRACE_ENTER("DistributionSocketLink::handle_claim");
1730
/*gint count = */ packet.unpack_ushort();
1732
if (i_am_master && master_locked)
1734
dist_manager->log(_("Rejecting master request from client %s."),
1735
client->id == NULL ? "Unknown" : client->id);
1736
send_claim_reject(client);
1740
dist_manager->log(_("Acknowledging master request from client %s."),
1741
client->id == NULL ? "Unknown" : client->id);
1743
bool was_master = i_am_master;
1745
// Marks client as master
1747
assert(!i_am_master);
1749
// If I was previously master, distribute state.
1752
//dist_manager->log(_("Transferring state to client %s:%d."),
1753
// client->hostname, client->port);
1754
send_client_message(DCMT_MASTER);
1757
// And tell everyone we have a new master.
1766
//! Inform that the claim has been rejected.
1768
DistributionSocketLink::send_claim_reject(Client *client)
1770
TRACE_ENTER("DistributionSocketLink::send_claim_reject");
1772
PacketBuffer packet;
1775
init_packet(packet, PACKET_CLAIM_REJECT);
1777
send_packet(client, packet);
1782
//! Handles a rejection of my claim.
1784
DistributionSocketLink::handle_claim_reject(PacketBuffer &packet, Client *client)
1786
TRACE_ENTER("DistributionSocketLink::handle_claim");
1789
if (client != master_client)
1791
dist_manager->log(_("Non-master client %s rejected master request."),
1792
client->id == NULL ? "Unknown" : client->id);
1796
dist_manager->log(_("Client %s rejected master request, delaying."),
1797
client->id == NULL ? "Unknown" : client->id);
1798
client->reject_count++;
1799
int count = client->reject_count;
1806
client->next_claim_time = time(NULL) + 5 * count;
1812
//! Informs the specified client (or all remote clients) that a new client is now master.
1814
DistributionSocketLink::send_new_master(Client *client)
1816
TRACE_ENTER("DistributionSocketLink::send_new_master");
1818
PacketBuffer packet;
1821
init_packet(packet, PACKET_NEW_MASTER);
1825
if (master_client == NULL)
1827
// I've become master
1830
else if (master_client->id != NULL)
1832
// Another remote client becomes master
1833
id = master_client->id;
1836
packet.pack_string(id);
1837
packet.pack_ushort(0);
1841
send_packet(client, packet);
1845
send_packet_broadcast(packet);
1853
//! Handles a new master event.
1855
DistributionSocketLink::handle_new_master(PacketBuffer &packet, Client *client)
1857
TRACE_ENTER("DistributionSocketLink::handle_new_master");
1859
for (list<Client *>::iterator i = clients.begin(); i != clients.end(); i++)
1861
(*i)->reject_count = 0;
1864
gchar *id = packet.unpack_string();
1865
/* gint count = */ packet.unpack_ushort();
1867
dist_manager->log(_("Client %s is now the new master."),
1868
id == NULL ? "Unknown" : id);
1870
if (client->id != NULL)
1872
TRACE_MSG("new master from " << client->id << " -> " << id);
1875
set_master_by_id(id);
1883
// Distributes the current client message.
1885
DistributionSocketLink::send_client_message(DistributionClientMessageType type)
1887
TRACE_ENTER("DistributionSocketLink:send_client_message");
1889
PacketBuffer packet;
1891
init_packet(packet, PACKET_CLIENTMSG);
1893
string id = get_master();
1894
packet.pack_string(id);
1896
packet.pack_ushort(client_message_map.size());
1898
ClientMessageMap::iterator i = client_message_map.begin();
1899
while (i != client_message_map.end())
1901
DistributionClientMessageID id = i->first;
1902
ClientMessageListener &sl = i->second;
1904
IDistributionClientMessage *itf = sl.listener;
1907
packet.pack_ushort(id);
1908
packet.reserve_size(pos);
1910
if ((sl.type & type) != 0)
1912
TRACE_MSG("request " << id << " " << type);
1913
itf->request_client_message(id, packet);
1916
packet.update_size(pos);
1921
send_packet_broadcast(packet);
1926
//! Handles client message from a remote client.
1928
DistributionSocketLink::handle_client_message(PacketBuffer &packet, Client *client)
1930
TRACE_ENTER("DistributionSocketLink:handle_client_message");
1933
bool will_i_become_master = false;
1935
// dist_manager->log(_("Reveived client message from client %s:%d."), client->hostname, client->port);
1937
gchar *id = packet.unpack_string();
1941
//TRACE_MSG("id = " << id);
1943
will_i_become_master = client_is_me(id);
1947
gint size = packet.unpack_ushort();
1950
TRACE_MSG("size = " << size);
1951
for (int i = 0; i < size; i++)
1953
DistributionClientMessageID id = (DistributionClientMessageID) packet.unpack_ushort();
1954
gint datalen = packet.read_size(pos);
1956
TRACE_MSG("len = " << datalen << " " << id);
1960
// Narrow the buffer to the client message data.
1961
packet.narrow(-1, datalen);
1963
ClientMessageMap::iterator it = client_message_map.find(id);
1964
if (it != client_message_map.end())
1966
client_message_map[id].listener->client_message(id, will_i_become_master, client->id, packet);
1969
packet.narrow(0, -1);
1972
packet.skip_size(pos);
1980
DistributionSocketLink::start_async_server()
1982
TRACE_ENTER("DistributionSocketLink::start_async_server");
1987
/* Create the server */
1988
server_socket = socket_driver->create_server();
1990
if (server_socket != NULL)
1992
server_socket->set_listener(this);
1993
server_socket->listen(server_port);
1994
dist_manager->log(_("Network operation started."));
1998
catch(SocketException e)
2008
DistributionSocketLink::socket_accepted(ISocketServer *scon, ISocket *ccon)
2012
TRACE_ENTER("DistributionSocketLink::socket_accepted");
2015
dist_manager->log(_("Accepted new client."));
2017
Client *client = new Client;
2018
client->type = CLIENTTYPE_DIRECT;
2019
client->peer = NULL;
2020
client->packet.create();
2022
TRACE_RETURN(client->packet.bytes_available());
2023
client->socket = ccon;
2024
client->hostname = NULL;
2027
client->reconnect_count = 0;
2028
client->reconnect_time = 0;
2030
ccon->set_data(client);
2031
ccon->set_listener(this);
2032
clients.push_back(client);
2039
DistributionSocketLink::socket_io(ISocket *con, void *data)
2041
TRACE_ENTER("DistributionSocketLink::socket_io");
2044
Client *client = (Client *)data;
2045
g_assert(client != NULL);
2049
if (!is_client_valid(client) && client->type == CLIENTTYPE_DIRECT)
2051
TRACE_RETURN("Invalid client");
2056
int bytes_to_read = 4;
2058
TRACE_MSG("2 " << client->packet.bytes_available() );
2060
if (client->packet.bytes_available() >= 4)
2064
bytes_to_read = client->packet.peek_ushort(0) - 4;
2066
TRACE_MSG("4 " << bytes_to_read);
2068
if (bytes_to_read + 4 > client->packet.get_buffer_size())
2070
TRACE_MSG("5 " << bytes_to_read << " " << client->packet.get_buffer_size());
2071
// FIXME: the 1024 is lame...
2072
client->packet.resize(bytes_to_read + 4 + 1024);
2080
con->read(client->packet.get_write_ptr(), bytes_to_read, bytes_read);
2082
catch (SocketException)
2089
dist_manager->log(_("Client %s read error, closing."),
2090
client->id == NULL ? "Unknown" : client->id);
2093
else if (bytes_read == 0)
2095
dist_manager->log(_("Client %s closed connection."),
2096
client->id == NULL ? "Unknown" : client->id);
2101
g_assert(bytes_read > 0);
2102
client->packet.write_ptr += bytes_read;
2104
if (client->packet.peek_ushort(0) == client->packet.bytes_written())
2106
process_client_packet(client);
2112
close_client(client, client->outbound);
2121
DistributionSocketLink::socket_connected(ISocket *con, void *data)
2123
TRACE_ENTER("DistributionSocketLink::socket_connected");
2125
Client *client = (Client *)data;
2127
g_assert(client != NULL);
2128
g_assert(con != NULL);
2130
if (!is_client_valid(client) && client->type == CLIENTTYPE_DIRECT)
2132
TRACE_RETURN("Invalid client");
2136
dist_manager->log(_("Client %s connected."),
2137
client->id != NULL ? client->id : "Unknown");
2139
client->reconnect_count = 0;
2140
client->reconnect_time = 0;
2141
client->outbound = true;
2142
client->socket = con;
2151
DistributionSocketLink::socket_closed(ISocket *con, void *data)
2153
TRACE_ENTER("DistributionSocketLink::socket_closed");
2156
Client *client = (Client *)data;
2157
assert(client != NULL);
2159
if (!is_client_valid(client) && client->type == CLIENTTYPE_DIRECT)
2161
TRACE_RETURN("Invalid client");
2165
// Socket error. Disable client.
2166
if (client->socket != NULL)
2168
dist_manager->log(_("Client %s closed connection."),
2169
client->id != NULL ? client->id : "Unknown");
2170
close_client(client, client->outbound);
2174
dist_manager->log(_("Could not connect to client %s."),
2175
client->id != NULL ? client->id : "Unknown");
2176
remove_client(client);
2183
//! Read the configuration from the configurator.
2185
DistributionSocketLink::read_configuration()
2187
int old_port = server_port;
2189
const char *port = getenv("WORKRAVE_PORT");
2192
server_port = atoi(port);
2196
server_port = dist_manager->get_port();
2199
if (old_port != server_port && server_enabled)
2201
set_server_enabled(false);
2202
set_server_enabled(true);
2205
reconnect_interval = dist_manager->get_reconnect_interval();
2206
reconnect_attempts = dist_manager->get_reconnect_attempts();
2209
str = dist_manager->get_username();
2210
username = str != "" ? g_strdup(str.c_str()) : NULL;
2212
str = dist_manager->get_password();
2213
password = str != "" ? g_strdup(str.c_str()) : NULL;
2217
//! Notification from the configurator that the configuration has changed.
2219
DistributionSocketLink::config_changed_notify(const string &key)
2222
read_configuration();