81
82
struct _DeeSharedModelPrivate
84
GDBusConnection *connection;
87
88
/* Buffer of DeeSharedModelRevisions that we keep in order to batch
88
89
* our DBus signals. The invariant is that all buffered revisions
89
90
* are of the same type */
90
91
GSList *revision_queue;
91
92
guint revision_queue_timeout_id;
92
guint dbus_signal_handler;
93
guint model_registration_id;
93
guint acquisition_timer_id;
94
94
gulong swarm_leader_handler;
95
gulong connection_acquired_handler;
96
gulong connection_closed_handler;
97
GArray *connection_infos;
96
99
gboolean synchronized;
97
100
gboolean found_first_peer;
98
101
gboolean suppress_remote_signals;
138
147
//static guint32 _signals[LAST_SIGNAL] = { 0 };
141
static void on_bus_connection_acquired (GObject *source_object,
150
static void on_connection_acquired (DeeSharedModel *self,
151
GDBusConnection *connection,
154
static void on_connection_closed (DeeSharedModel *self,
155
GDBusConnection *connection,
145
158
static void commit_transaction (DeeSharedModel *self,
146
159
const gchar *sender_name,
192
205
static void reset_model (DeeModel *self);
194
207
static void invalidate_peer (DeeSharedModel *self,
195
const gchar *sender_name);
208
const gchar *sender_name,
209
GDBusConnection *except);
197
211
static gboolean on_invalidate (DeeSharedModel *self);
263
277
DeeSharedModelRevision *rev;
280
GSList *connection_iter;
266
281
GVariant *schema;
282
GVariant *transaction_variant;
267
283
GVariantBuilder aav, au, ay, transaction;
268
284
guint64 seqnum_begin = 0, seqnum_end = 0;
278
294
* we'll assume the programmer knows this
279
295
* 2) We are resetting the model - no problem
281
if (priv->connection == NULL)
297
if (priv->connections == NULL)
283
299
trace_object (self, "Flushing revision queue, without a connection. "
284
300
"This will blow up unless you are the leader model");
368
384
g_variant_builder_add_value (&transaction,
369
385
g_variant_new ("(tt)", seqnum_begin, seqnum_end));
371
/* Throw a Commit signal on the bus */
373
g_dbus_connection_emit_signal(priv->connection,
376
"com.canonical.Dee.Model",
378
g_variant_builder_end (&transaction),
387
transaction_variant = g_variant_builder_end (&transaction);
389
/* Throw a Commit signal */
390
for (connection_iter = priv->connections; connection_iter != NULL;
391
connection_iter = connection_iter->next)
383
g_critical ("Failed to emit DBus signal "
384
"com.canonical.Dee.Model.Commit: %s", error->message);
385
g_error_free (error);
394
g_dbus_connection_emit_signal((GDBusConnection*) connection_iter->data,
397
"com.canonical.Dee.Model",
404
g_critical ("Failed to emit DBus signal "
405
"com.canonical.Dee.Model.Commit: %s", error->message);
406
g_error_free (error);
388
410
trace_object (self, "Flushed %"G_GUINT64_FORMAT" revisions. "
437
460
flush_revision_queue (DEE_MODEL(object));
438
461
priv->revision_queue = NULL;
441
if (priv->model_registration_id != 0 && priv->connection != NULL)
443
g_dbus_connection_unregister_object (priv->connection,
444
priv->model_registration_id);
445
priv->model_registration_id = 0;
447
if (priv->dbus_signal_handler != 0 && priv->connection != NULL)
449
g_dbus_connection_signal_unsubscribe(priv->connection,
450
priv->dbus_signal_handler);
451
priv->dbus_signal_handler = 0;
464
if (priv->acquisition_timer_id != 0)
466
g_source_remove (priv->acquisition_timer_id);
467
priv->acquisition_timer_id = 0;
470
if (priv->connection_acquired_handler)
472
g_signal_handler_disconnect (priv->swarm,
473
priv->connection_acquired_handler);
474
priv->connection_acquired_handler = 0;
477
if (priv->connection_closed_handler)
479
g_signal_handler_disconnect (priv->swarm, priv->connection_closed_handler);
480
priv->connection_closed_handler = 0;
483
if (priv->connection_infos != NULL)
485
for (i = 0; i < priv->connection_infos->len; i++)
487
DeeConnectionInfo *info;
488
info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
489
g_dbus_connection_unregister_object (info->connection,
490
info->registration_id);
491
g_dbus_connection_signal_unsubscribe (info->connection,
492
info->signal_subscription_id);
495
g_array_unref (priv->connection_infos);
496
priv->connection_infos = NULL;
453
498
if (priv->swarm_leader_handler != 0)
515
560
g_value_set_object (value, priv->swarm);
517
562
case PROP_SYNCHRONIZED:
518
g_value_set_boolean (value, priv->synchronized);
563
g_value_set_boolean (value, priv->synchronized);
521
566
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
572
iterate_connections (DeeSharedModel *self)
574
DeeSharedModelPrivate *priv;
577
g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
580
priv->connections = dee_peer_get_connections (priv->swarm);
582
for (iter = priv->connections; iter != NULL; iter = iter->next)
584
on_connection_acquired (self, (GDBusConnection*) iter->data, priv->swarm);
587
priv->acquisition_timer_id = 0;
527
dee_shared_model_constructed (GObject *self)
593
dee_shared_model_constructed (GObject *object)
529
DeeSharedModel *_self;
595
DeeSharedModel *self;
530
596
DeeSharedModelPrivate *priv;
533
_self = DEE_SHARED_MODEL (self);
599
/* GObjectClass has NULL 'constructed' member, but we add this check for
600
* future robustness if we ever move to another base class */
601
if (G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed != NULL)
602
G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed (object);
604
self = DEE_SHARED_MODEL (object);
536
607
if (priv->swarm == NULL)
549
priv->swarm_leader_handler = g_signal_connect_swapped(priv->swarm,
550
"notify::swarm-leader",
551
G_CALLBACK (on_leader_changed),
554
/* Connect asynchronously to the bus */
555
g_bus_get (G_BUS_TYPE_SESSION,
557
on_bus_connection_acquired,
558
g_object_ref (self)); // ref to stay alive during async call
560
/* GObjectClass has NULL 'constructed' member, but we add this check for
561
* future robustness if we ever move to another base class */
562
if (G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed != NULL)
563
G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed (self);
620
priv->swarm_leader_handler =
621
g_signal_connect_swapped (priv->swarm, "notify::swarm-leader",
622
G_CALLBACK (on_leader_changed), self);
624
priv->connection_acquired_handler =
625
g_signal_connect_swapped (priv->swarm, "connection-acquired",
626
G_CALLBACK (on_connection_acquired), self);
628
priv->connection_closed_handler =
629
g_signal_connect_swapped (priv->swarm, "connection-closed",
630
G_CALLBACK (on_connection_closed), self);
632
/* we don't want to invoke on_connection_acquired from here, it would mean
633
* emitting important signal when inside g_object_new */
634
/* using G_PRIORITY_DEFAULT will ensure that this will be dispatched before
635
* we'll have a chance to acquire new connections... FIXME: right? */
636
priv->acquisition_timer_id = g_idle_add_full (G_PRIORITY_DEFAULT,
637
(GSourceFunc) iterate_connections, self, NULL);
613
687
priv = self->priv = DEE_SHARED_MODEL_GET_PRIVATE (self);
615
priv->synchronized = FALSE;
616
689
priv->swarm = NULL;
617
690
priv->model_path = NULL;
619
692
priv->revision_queue = NULL;
620
693
priv->revision_queue_timeout_id = 0;
621
priv->dbus_signal_handler = 0;
622
priv->model_registration_id = 0;
623
694
priv->swarm_leader_handler = 0;
625
696
priv->synchronized = FALSE;
629
700
if (!dee_shared_model_error_quark)
630
701
dee_shared_model_error_quark = g_quark_from_string ("dbus-model-error");
632
priv->connection = NULL;
703
priv->connections = NULL;
704
priv->connection_infos = g_array_new (FALSE, TRUE, sizeof (DeeConnectionInfo));
634
706
/* Connect to our own signals so we can queue up revisions to be emitted
692
on_bus_connection_acquired (GObject *source_object,
764
on_connection_acquired (DeeSharedModel *self,
765
GDBusConnection *connection,
696
DeeSharedModel *self;
697
768
DeeSharedModelPrivate *priv;
769
DeeConnectionInfo connection_info;
699
770
GDBusNodeInfo *model_introspection_data;
771
guint dbus_signal_handler;
772
guint model_registration_id;
701
774
/* Keep the parsed introspection data of the Model interface around */
702
775
static GDBusInterfaceInfo *model_interface_info = NULL;
704
g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
777
g_return_if_fail (DEE_IS_SHARED_MODEL (self));
706
self = DEE_SHARED_MODEL (user_data);
707
779
priv = self->priv;
709
if (priv->connection)
711
g_critical ("Internal error in DeeSharedModel. "
712
"DBus connection acquired twice.");
714
/* We don't know whether or not to g_object_unref(self) at this point
715
* since the internal state is messed up. We don't do the unref to try
716
* and avoid double freeing self */
722
priv->connection = g_bus_get_finish (res, &error);
726
g_critical ("DeeSharedModel@%p failed to connect to session bus: %s",
727
self, error->message);
728
g_error_free (error);
729
g_object_unref (self); // held during async call
781
if (connection == NULL)
783
g_warning ("Internal error in DeeSharedModel. %s called with NULL "
784
"connection", __func__);
788
/* Update our list of connections */
789
if (priv->connections) g_slist_free (priv->connections);
790
priv->connections = dee_peer_get_connections (priv->swarm);
733
792
/* Listen for changes from the peers in the same swarm.
734
793
* We do this by matching arg0 with the swarm name */
735
priv->dbus_signal_handler = g_dbus_connection_signal_subscribe (
794
dbus_signal_handler = g_dbus_connection_signal_subscribe (
738
797
"com.canonical.Dee.Model", // iface
760
819
/* Export the model on the bus */
761
priv->model_registration_id =
762
g_dbus_connection_register_object (priv->connection,
820
model_registration_id =
821
g_dbus_connection_register_object (connection,
763
822
priv->model_path, /* object path */
764
823
model_interface_info,
765
824
&model_interface_vtable,
767
826
NULL, /* user_data_free_func */
768
827
NULL); /* GError** */
829
connection_info.connection = connection;
830
connection_info.signal_subscription_id = dbus_signal_handler;
831
connection_info.registration_id = model_registration_id;
832
g_array_append_val (priv->connection_infos, connection_info);
770
834
/* If we are swarm leaders and we have column type info we are ready by now.
771
835
* Otherwise we will be ready when we receive the model clone from the leader
773
837
if (dee_peer_is_swarm_leader (priv->swarm))
775
if (dee_model_get_n_columns (DEE_MODEL (self)) > 0)
839
if (dee_model_get_n_columns (DEE_MODEL (self)) > 0 && !priv->synchronized)
777
841
priv->synchronized = TRUE;
778
842
g_object_notify (G_OBJECT (self), "synchronized");
790
854
// FIXME: There's no known leader
793
g_object_unref (self); // held self-ref during async call
859
on_connection_closed (DeeSharedModel *self,
860
GDBusConnection *connection,
863
DeeSharedModelPrivate *priv;
866
g_return_if_fail (DEE_IS_SHARED_MODEL (self));
870
/* Update our list of connections */
871
if (priv->connections) g_slist_free (priv->connections);
872
priv->connections = dee_peer_get_connections (priv->swarm);
874
/* Disconnect signals etc */
875
for (i = 0; i < priv->connection_infos->len; i++)
877
DeeConnectionInfo *info;
878
info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
879
if (info->connection == connection)
881
g_dbus_connection_unregister_object (info->connection,
882
info->registration_id);
883
g_dbus_connection_signal_unsubscribe (info->connection,
884
info->signal_subscription_id);
885
/* remove the item */
886
g_array_remove_index (priv->connection_infos, i);
796
892
/* Callback for clone_leader() */
859
955
clone_leader (DeeSharedModel *self)
861
957
DeeSharedModelPrivate *priv;
863
960
g_return_if_fail (DEE_IS_SHARED_MODEL (self));
864
961
g_return_if_fail (dee_peer_get_swarm_leader (self->priv->swarm) != NULL);
865
g_return_if_fail (self->priv->connection != NULL);
866
962
g_return_if_fail (self->priv->revision_queue == NULL);
867
963
g_return_if_fail (dee_model_get_n_rows (DEE_MODEL (self)) == 0);
871
967
trace_object (self, "Cloning leader '%s'",
872
968
dee_shared_model_get_swarm_name (self));
874
g_dbus_connection_call(priv->connection,
875
dee_shared_model_get_swarm_name (self), // name
876
priv->model_path, // obj path
877
"com.canonical.Dee.Model", // iface
880
G_VARIANT_TYPE ("(sasaavauay(tt))"), // ret type
881
G_DBUS_CALL_FLAGS_NONE,
884
on_clone_received, // cb
885
g_object_ref (self)); // userdata
970
/* This shouldn't really happen when we have multiple connections, but let's
971
* have it here for consistency */
972
for (iter = priv->connections; iter != NULL; iter = iter->next)
974
g_dbus_connection_call((GDBusConnection*) iter->data,
975
dee_shared_model_get_swarm_name (self), // name
976
priv->model_path, // obj path
977
"com.canonical.Dee.Model", // iface
980
G_VARIANT_TYPE ("(sasaavauay(tt))"), // ret type
981
G_DBUS_CALL_FLAGS_NONE,
984
on_clone_received, // cb
985
g_object_ref (self)); // userdata
894
995
GVariant *parameters,
895
996
gpointer user_data)
998
DeeSharedModel *model;
999
const gchar *unique_name;
897
1001
g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
1003
unique_name = g_dbus_connection_get_unique_name (connection);
1005
trace_object (user_data, "%s: sender: %s, our unique_name: %s",
1006
__func__, sender_name, unique_name);
899
1008
/* Ignore signals from our selves. We may get those because of the way
900
1009
* we set up the match rules */
901
if (g_strcmp0 (sender_name,
1010
if (unique_name != NULL && g_strcmp0 (sender_name,
902
1011
g_dbus_connection_get_unique_name (connection)) == 0)
906
1014
if (g_strcmp0 (signal_name, "Commit") == 0)
908
commit_transaction (DEE_SHARED_MODEL (user_data),
1016
model = DEE_SHARED_MODEL (user_data);
1017
commit_transaction (model, sender_name, parameters);
1019
if (g_slist_length (model->priv->connections) > 1)
1021
/* this is a server and a client (non-leader) just committed a change
1022
* to the model, let's invalidate all other clients */
1023
invalidate_peer (model, sender_name, connection);
913
1027
g_warning ("Unexpected signal %s.%s from %s",
1036
1149
if (dee_shared_model_is_leader (self))
1038
1151
g_warning ("Invalidating %s", sender_name);
1039
invalidate_peer (self, sender_name);
1152
invalidate_peer (self, sender_name, NULL);
1042
1155
g_variant_unref (transaction);
1245
1358
/* Call DBus method com.canonical.Dee.Model.Invalidate() on @sender_name */
1247
1360
invalidate_peer (DeeSharedModel *self,
1248
const gchar *sender_name)
1361
const gchar *sender_name,
1362
GDBusConnection *except)
1250
1364
DeeSharedModelPrivate *priv;
1252
1367
g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1253
g_return_if_fail (sender_name != NULL);
1255
1369
if (!dee_shared_model_is_leader (self))
1262
1376
priv = self->priv;
1264
g_dbus_connection_call (priv->connection,
1267
"com.canonical.Dee.Model",
1270
NULL, /* reply type */
1271
G_DBUS_CALL_FLAGS_NONE,
1275
NULL); /* user data */
1378
// invalidate peers on all connections
1379
for (iter = priv->connections; iter != NULL; iter = iter->next)
1381
if (iter->data == except) continue;
1382
g_dbus_connection_call ((GDBusConnection*) iter->data,
1385
"com.canonical.Dee.Model",
1388
NULL, /* reply type */
1389
G_DBUS_CALL_FLAGS_NONE,
1393
NULL); /* user data */
1278
1397
/* Public Methods */
1431
* dee_shared_model_new_for_peer:
1432
* @peer: (transfer full): A #DeePeer instance.
1434
* Create a new empty shared model without any column schema associated.
1435
* The column schema will be set in one of two ways: firstly you may set it
1436
* manually with dee_model_set_schema() or secondly it will be set once
1437
* the first rows are exchanged with a peer model.
1439
* A #DeeSharedModel with a schema manually set has to be created before
1440
* creating more #DeeSharedModel with the same @name.
1442
* A shared model created with this constructor will store row data in a
1443
* suitably picked memory backed model.
1445
* Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1448
dee_shared_model_new_for_peer (DeePeer *peer)
1453
g_return_val_if_fail (peer != NULL, NULL);
1455
back_end = (DeeModel*) dee_sequence_model_new ();
1457
self = g_object_new (DEE_TYPE_SHARED_MODEL,
1458
"back-end", back_end,
1462
g_object_unref (back_end);
1463
g_object_unref (peer);
1312
1469
* dee_shared_model_new_with_back_end:
1313
1470
* @name: (transfer none): A well known name to publish this model under.
1314
1471
* Models sharing this name will synchronize with each other
1319
1476
* Create a new shared model storing all data in @back_end.
1321
* In order to start synchronizing the new model with peer models you must call
1322
* dee_shared_model_connect() on it.
1478
* The model will start synchronizing with peer models as soon as possible and
1479
* the #DeeSharedModel:synchronized property will be set once finished.
1324
1481
* Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1467
1625
priv = self->priv;
1468
1626
n_revisions = flush_revision_queue (DEE_MODEL (self));
1471
g_dbus_connection_flush_sync (priv->connection, NULL, &error);
1628
for (iter = priv->connections; iter != NULL; iter = iter->next)
1474
g_critical ("Error when flushing %u revisions of %s@%p: %s",
1475
n_revisions, G_OBJECT_TYPE_NAME (self), self, error->message);
1476
g_error_free (error);
1631
g_dbus_connection_flush_sync ((GDBusConnection*) iter->data, NULL, &error);
1634
g_critical ("Error when flushing %u revisions of %s@%p: %s",
1635
n_revisions, G_OBJECT_TYPE_NAME (self), self,
1637
g_error_free (error);
1480
1642
return n_revisions;