~mhr3/dee/vala-tags

« back to all changes in this revision

Viewing changes to dee/dee-shared-model.c

  • Committer: Tarmac
  • Author(s): Michal Hruby, Mikkel Kamstrup Erlandsen
  • Date: 2011-12-19 13:35:23 UTC
  • mfrom: (317.1.21 dee)
  • Revision ID: tarmac-20111219133523-kwsylfvhe4h38v4o
This branch implements possibility to create private connections which can be used to synchronize the shared models.. Fixes: https://bugs.launchpad.net/bugs/904299. Appoved by Mikkel Kamstrup Erlandsen.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 * Authored by:
18
18
 *               Mikkel Kamstrup Erlandsen <mikkel.kamstrup@canonical.com>
19
19
 *               Neil Jagdish Patel <neil.patel@canonical.com>
 
20
 *               Michal Hruby <michal.hruby@canonical.com>
20
21
 */
21
22
 
22
23
/**
80
81
 **/
81
82
struct _DeeSharedModelPrivate
82
83
{
83
 
  DeePeer         *swarm;
84
 
  GDBusConnection *connection;
85
 
  gchar           *model_path;
 
84
  DeePeer    *swarm;
 
85
  GSList     *connections;
 
86
  gchar      *model_path;
86
87
  
87
88
  /* Buffer of DeeSharedModelRevisions that we keep in order to batch
88
89
   * our DBus signals. The invariant is that all buffered revisions
89
90
   * are of the same type */
90
91
  GSList     *revision_queue;
91
92
  guint       revision_queue_timeout_id;
92
 
  guint       dbus_signal_handler;
93
 
  guint       model_registration_id;
 
93
  guint       acquisition_timer_id;
94
94
  gulong      swarm_leader_handler;
95
 
  
 
95
  gulong      connection_acquired_handler;
 
96
  gulong      connection_closed_handler;
 
97
  GArray     *connection_infos;
 
98
 
96
99
  gboolean    synchronized;
97
100
  gboolean    found_first_peer;
98
101
  gboolean    suppress_remote_signals;
108
111
  DeeModel   *model;
109
112
} DeeSharedModelRevision;
110
113
 
 
114
typedef struct
 
115
{
 
116
  GDBusConnection *connection;
 
117
  guint            signal_subscription_id;
 
118
  guint            registration_id;
 
119
} DeeConnectionInfo;
111
120
/* Globals */
112
121
static GQuark           dee_shared_model_error_quark       = 0;
113
122
 
138
147
//static guint32 _signals[LAST_SIGNAL] = { 0 };
139
148
 
140
149
/* Forwards */
141
 
static void     on_bus_connection_acquired             (GObject      *source_object,
142
 
                                                        GAsyncResult *res,
143
 
                                                        gpointer      user_data);
 
150
static void     on_connection_acquired                 (DeeSharedModel  *self,
 
151
                                                        GDBusConnection *connection,
 
152
                                                        DeePeer         *peer);
 
153
 
 
154
static void     on_connection_closed                   (DeeSharedModel  *self,
 
155
                                                        GDBusConnection *connection,
 
156
                                                        DeePeer         *peer);
144
157
 
145
158
static void     commit_transaction                     (DeeSharedModel *self,
146
159
                                                        const gchar    *sender_name,
192
205
static void        reset_model                   (DeeModel       *self);
193
206
 
194
207
static void        invalidate_peer               (DeeSharedModel  *self,
195
 
                                                  const gchar     *sender_name);
 
208
                                                  const gchar     *sender_name,
 
209
                                                  GDBusConnection *except);
196
210
 
197
211
static gboolean    on_invalidate                 (DeeSharedModel  *self);
198
212
 
263
277
  DeeSharedModelRevision *rev;
264
278
  GError                 *error;
265
279
  GSList                 *iter;
 
280
  GSList                 *connection_iter;
266
281
  GVariant               *schema;
 
282
  GVariant               *transaction_variant;
267
283
  GVariantBuilder         aav, au, ay, transaction;
268
284
  guint64                 seqnum_begin = 0, seqnum_end = 0;
269
285
  guint                   n_cols, i;
278
294
   *    we'll assume the programmer knows this
279
295
   * 2) We are resetting the model - no problem
280
296
   */
281
 
  if (priv->connection == NULL)
 
297
  if (priv->connections == NULL)
282
298
    {
283
299
      trace_object (self, "Flushing revision queue, without a connection. "
284
300
                          "This will blow up unless you are the leader model");
368
384
  g_variant_builder_add_value (&transaction,
369
385
                               g_variant_new ("(tt)", seqnum_begin, seqnum_end));
370
386
 
371
 
  /* Throw a Commit signal on the bus */
372
 
  error = NULL;
373
 
  g_dbus_connection_emit_signal(priv->connection,
374
 
                                NULL,
375
 
                                priv->model_path,
376
 
                                "com.canonical.Dee.Model",
377
 
                                "Commit",
378
 
                                g_variant_builder_end (&transaction),
379
 
                                &error);
 
387
  transaction_variant = g_variant_builder_end (&transaction);
380
388
 
381
 
  if (error != NULL)
 
389
  /* Throw a Commit signal */
 
390
  for (connection_iter = priv->connections; connection_iter != NULL; 
 
391
       connection_iter = connection_iter->next)
382
392
    {
383
 
      g_critical ("Failed to emit DBus signal "
384
 
                  "com.canonical.Dee.Model.Commit: %s", error->message);
385
 
      g_error_free (error);
 
393
      error = NULL;
 
394
      g_dbus_connection_emit_signal((GDBusConnection*) connection_iter->data,
 
395
                                    NULL,
 
396
                                    priv->model_path,
 
397
                                    "com.canonical.Dee.Model",
 
398
                                    "Commit",
 
399
                                    transaction_variant,
 
400
                                    &error);
 
401
 
 
402
      if (error != NULL)
 
403
        {
 
404
          g_critical ("Failed to emit DBus signal "
 
405
                      "com.canonical.Dee.Model.Commit: %s", error->message);
 
406
          g_error_free (error);
 
407
        }
386
408
    }
387
409
 
388
410
  trace_object (self, "Flushed %"G_GUINT64_FORMAT" revisions. "
429
451
static void
430
452
dee_shared_model_finalize (GObject *object)
431
453
{
 
454
  guint i;
432
455
  DeeSharedModelPrivate *priv = DEE_SHARED_MODEL (object)->priv;
433
456
 
434
457
  /* Flush any pending revisions */
437
460
      flush_revision_queue (DEE_MODEL(object));
438
461
      priv->revision_queue = NULL;
439
462
    }
440
 
  
441
 
  if (priv->model_registration_id != 0 && priv->connection != NULL)
442
 
    {
443
 
      g_dbus_connection_unregister_object (priv->connection,
444
 
                                           priv->model_registration_id);
445
 
      priv->model_registration_id = 0;
446
 
    }
447
 
  if (priv->dbus_signal_handler != 0 && priv->connection != NULL)
448
 
    {
449
 
      g_dbus_connection_signal_unsubscribe(priv->connection,
450
 
                                           priv->dbus_signal_handler);
451
 
      priv->dbus_signal_handler = 0;
 
463
 
 
464
  if (priv->acquisition_timer_id != 0)
 
465
    {
 
466
      g_source_remove (priv->acquisition_timer_id);
 
467
      priv->acquisition_timer_id = 0;
 
468
    }
 
469
 
 
470
  if (priv->connection_acquired_handler)
 
471
    {
 
472
      g_signal_handler_disconnect (priv->swarm,
 
473
                                   priv->connection_acquired_handler);
 
474
      priv->connection_acquired_handler = 0;
 
475
    }
 
476
 
 
477
  if (priv->connection_closed_handler)
 
478
    {
 
479
      g_signal_handler_disconnect (priv->swarm, priv->connection_closed_handler);
 
480
      priv->connection_closed_handler = 0;
 
481
    }
 
482
 
 
483
  if (priv->connection_infos != NULL)
 
484
    {
 
485
      for (i = 0; i < priv->connection_infos->len; i++)
 
486
        {
 
487
          DeeConnectionInfo *info;
 
488
          info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
 
489
          g_dbus_connection_unregister_object (info->connection,
 
490
                                               info->registration_id);
 
491
          g_dbus_connection_signal_unsubscribe (info->connection,
 
492
                                                info->signal_subscription_id);
 
493
        }
 
494
 
 
495
      g_array_unref (priv->connection_infos);
 
496
      priv->connection_infos = NULL;
452
497
    }
453
498
  if (priv->swarm_leader_handler != 0)
454
499
    {
459
504
      {
460
505
        g_free (priv->model_path);
461
506
      }
462
 
  if (priv->connection)
 
507
  if (priv->connections)
463
508
    {
464
 
      g_object_unref (priv->connection);
465
 
      priv->connection = NULL;
 
509
      g_slist_free (priv->connections);
 
510
      priv->connections = NULL;
466
511
    }
467
512
  if (priv->swarm)
468
513
    {
515
560
      g_value_set_object (value, priv->swarm);
516
561
      break;
517
562
    case PROP_SYNCHRONIZED:
518
 
          g_value_set_boolean (value, priv->synchronized);
519
 
          break;
 
563
      g_value_set_boolean (value, priv->synchronized);
 
564
      break;
520
565
    default:
521
566
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
522
567
      break;
523
568
  }
524
569
}
525
570
 
 
571
static gboolean
 
572
iterate_connections (DeeSharedModel *self)
 
573
{
 
574
  DeeSharedModelPrivate *priv;
 
575
  GSList                *iter;
 
576
 
 
577
  g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
 
578
  priv = self->priv;
 
579
 
 
580
  priv->connections = dee_peer_get_connections (priv->swarm);
 
581
 
 
582
  for (iter = priv->connections; iter != NULL; iter = iter->next)
 
583
    {
 
584
      on_connection_acquired (self, (GDBusConnection*) iter->data, priv->swarm);
 
585
    }
 
586
 
 
587
  priv->acquisition_timer_id = 0;
 
588
 
 
589
  return FALSE;
 
590
}
 
591
 
526
592
static void
527
 
dee_shared_model_constructed (GObject *self)
 
593
dee_shared_model_constructed (GObject *object)
528
594
{
529
 
  DeeSharedModel        *_self;
 
595
  DeeSharedModel        *self;
530
596
  DeeSharedModelPrivate *priv;
531
597
  gchar                 *dummy;
532
598
 
533
 
  _self = DEE_SHARED_MODEL (self);
534
 
  priv = _self->priv;
 
599
  /* GObjectClass has NULL 'constructed' member, but we add this check for
 
600
   * future robustness if we ever move to another base class */
 
601
  if (G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed != NULL)
 
602
    G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed (object);
 
603
 
 
604
  self = DEE_SHARED_MODEL (object);
 
605
  priv = self->priv;
535
606
 
536
607
  if (priv->swarm == NULL)
537
608
    {
546
617
                                    NULL);
547
618
  g_free (dummy);
548
619
 
549
 
  priv->swarm_leader_handler = g_signal_connect_swapped(priv->swarm,
550
 
                                                        "notify::swarm-leader",
551
 
                                                        G_CALLBACK (on_leader_changed),
552
 
                                                        self);
553
 
 
554
 
  /* Connect asynchronously to the bus */
555
 
  g_bus_get (G_BUS_TYPE_SESSION,
556
 
             NULL,
557
 
             on_bus_connection_acquired,
558
 
             g_object_ref (self)); // ref to stay alive during async call
559
 
 
560
 
  /* GObjectClass has NULL 'constructed' member, but we add this check for
561
 
   * future robustness if we ever move to another base class */
562
 
  if (G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed != NULL)
563
 
    G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed (self);
 
620
  priv->swarm_leader_handler =
 
621
    g_signal_connect_swapped (priv->swarm, "notify::swarm-leader",
 
622
                              G_CALLBACK (on_leader_changed), self);
 
623
 
 
624
  priv->connection_acquired_handler =
 
625
    g_signal_connect_swapped (priv->swarm, "connection-acquired",
 
626
                              G_CALLBACK (on_connection_acquired), self);
 
627
 
 
628
  priv->connection_closed_handler =
 
629
    g_signal_connect_swapped (priv->swarm, "connection-closed",
 
630
                              G_CALLBACK (on_connection_closed), self);
 
631
 
 
632
  /* we don't want to invoke on_connection_acquired from here, it would mean
 
633
   * emitting important signal when inside g_object_new */
 
634
  /* using G_PRIORITY_DEFAULT will ensure that this will be dispatched before
 
635
   * we'll have a chance to acquire new connections... FIXME: right? */
 
636
  priv->acquisition_timer_id = g_idle_add_full (G_PRIORITY_DEFAULT,
 
637
      (GSourceFunc) iterate_connections, self, NULL);
564
638
}
565
639
 
566
640
static void
612
686
 
613
687
  priv = self->priv = DEE_SHARED_MODEL_GET_PRIVATE (self);
614
688
 
615
 
  priv->synchronized = FALSE;
616
689
  priv->swarm = NULL;
617
690
  priv->model_path = NULL;
618
691
 
619
692
  priv->revision_queue = NULL;
620
693
  priv->revision_queue_timeout_id = 0;
621
 
  priv->dbus_signal_handler = 0;
622
 
  priv->model_registration_id = 0;
623
694
  priv->swarm_leader_handler = 0;
624
695
 
625
696
  priv->synchronized = FALSE;
629
700
  if (!dee_shared_model_error_quark)
630
701
    dee_shared_model_error_quark = g_quark_from_string ("dbus-model-error");
631
702
 
632
 
  priv->connection = NULL;
 
703
  priv->connections = NULL;
 
704
  priv->connection_infos = g_array_new (FALSE, TRUE, sizeof (DeeConnectionInfo));
633
705
  
634
706
  /* Connect to our own signals so we can queue up revisions to be emitted
635
707
   * on the bus */
689
761
};
690
762
 
691
763
static void
692
 
on_bus_connection_acquired (GObject      *source_object,
693
 
                            GAsyncResult *res,
694
 
                            gpointer      user_data)
 
764
on_connection_acquired (DeeSharedModel *self,
 
765
                        GDBusConnection *connection,
 
766
                        DeePeer *peer)
695
767
{
696
 
  DeeSharedModel        *self;
697
768
  DeeSharedModelPrivate *priv;
698
 
  GError                *error;
 
769
  DeeConnectionInfo      connection_info;
699
770
  GDBusNodeInfo         *model_introspection_data;
 
771
  guint                  dbus_signal_handler;
 
772
  guint                  model_registration_id;
700
773
 
701
774
  /* Keep the parsed introspection data of the Model interface around */
702
775
  static GDBusInterfaceInfo *model_interface_info = NULL;
703
776
 
704
 
  g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
 
777
  g_return_if_fail (DEE_IS_SHARED_MODEL (self));
705
778
 
706
 
  self = DEE_SHARED_MODEL (user_data);
707
779
  priv = self->priv;
708
780
 
709
 
  if (priv->connection)
710
 
    {
711
 
      g_critical ("Internal error in DeeSharedModel. "
712
 
                  "DBus connection acquired twice.");
713
 
 
714
 
      /* We don't know whether or not to g_object_unref(self) at this point
715
 
       * since the internal state is messed up. We don't do the unref to try
716
 
       * and avoid double freeing self */
717
 
 
718
 
      return;
719
 
    }
720
 
 
721
 
  error = NULL;
722
 
  priv->connection = g_bus_get_finish (res, &error);
723
 
 
724
 
  if (error != NULL)
725
 
    {
726
 
      g_critical ("DeeSharedModel@%p failed to connect to session bus: %s",
727
 
                  self, error->message);
728
 
      g_error_free (error);
729
 
      g_object_unref (self); // held during async call
730
 
      return;
731
 
    }
 
781
  if (connection == NULL)
 
782
    {
 
783
      g_warning ("Internal error in DeeSharedModel. %s called with NULL "
 
784
                 "connection", __func__);
 
785
      return;
 
786
    }
 
787
 
 
788
  /* Update our list of connections */
 
789
  if (priv->connections) g_slist_free (priv->connections);
 
790
  priv->connections = dee_peer_get_connections (priv->swarm);
732
791
 
733
792
  /* Listen for changes from the peers in the same swarm.
734
793
   * We do this by matching arg0 with the swarm name */
735
 
  priv->dbus_signal_handler = g_dbus_connection_signal_subscribe (
736
 
                                         priv->connection,
 
794
  dbus_signal_handler = g_dbus_connection_signal_subscribe (
 
795
                                         connection,
737
796
                                         NULL,                // sender
738
797
                                         "com.canonical.Dee.Model", // iface
739
798
                                         NULL,                // member
758
817
    }
759
818
 
760
819
  /* Export the model on the bus */
761
 
  priv->model_registration_id =
762
 
      g_dbus_connection_register_object (priv->connection,
 
820
  model_registration_id =
 
821
      g_dbus_connection_register_object (connection,
763
822
                                         priv->model_path, /* object path */
764
823
                                         model_interface_info,
765
824
                                         &model_interface_vtable,
767
826
                                         NULL,  /* user_data_free_func */
768
827
                                         NULL); /* GError** */
769
828
 
 
829
  connection_info.connection = connection;
 
830
  connection_info.signal_subscription_id = dbus_signal_handler;
 
831
  connection_info.registration_id = model_registration_id;
 
832
  g_array_append_val (priv->connection_infos, connection_info);
 
833
 
770
834
  /* If we are swarm leaders and we have column type info we are ready by now.
771
835
   * Otherwise we will be ready when we receive the model clone from the leader
772
836
   */
773
837
  if (dee_peer_is_swarm_leader (priv->swarm))
774
838
    {
775
 
      if (dee_model_get_n_columns (DEE_MODEL (self)) > 0)
 
839
      if (dee_model_get_n_columns (DEE_MODEL (self)) > 0 && !priv->synchronized)
776
840
        {
777
841
          priv->synchronized = TRUE;
778
842
          g_object_notify (G_OBJECT (self), "synchronized");
789
853
    {
790
854
      // FIXME: There's no known leader
791
855
    }
792
 
 
793
 
  g_object_unref (self); // held self-ref during async call
 
856
}
 
857
 
 
858
static void
 
859
on_connection_closed (DeeSharedModel  *self,
 
860
                      GDBusConnection *connection,
 
861
                      DeePeer         *peer)
 
862
{
 
863
  DeeSharedModelPrivate *priv;
 
864
  guint i;
 
865
 
 
866
  g_return_if_fail (DEE_IS_SHARED_MODEL (self));
 
867
 
 
868
  priv = self->priv;
 
869
 
 
870
  /* Update our list of connections */
 
871
  if (priv->connections) g_slist_free (priv->connections);
 
872
  priv->connections = dee_peer_get_connections (priv->swarm);
 
873
 
 
874
  /* Disconnect signals etc */
 
875
  for (i = 0; i < priv->connection_infos->len; i++)
 
876
    {
 
877
      DeeConnectionInfo *info;
 
878
      info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
 
879
      if (info->connection == connection)
 
880
        {
 
881
          g_dbus_connection_unregister_object (info->connection,
 
882
                                               info->registration_id);
 
883
          g_dbus_connection_signal_unsubscribe (info->connection,
 
884
                                                info->signal_subscription_id);
 
885
          /* remove the item */
 
886
          g_array_remove_index (priv->connection_infos, i);
 
887
          break;
 
888
        }
 
889
    }
794
890
}
795
891
 
796
892
/* Callback for clone_leader() */
811
907
  priv = self->priv;
812
908
 
813
909
  error = NULL;
814
 
  transaction = g_dbus_connection_call_finish (G_DBUS_CONNECTION(source_object),
 
910
  transaction = g_dbus_connection_call_finish (G_DBUS_CONNECTION (source_object),
815
911
                                               res, &error);
816
912
 
817
913
  if (error != NULL)
859
955
clone_leader (DeeSharedModel *self)
860
956
{
861
957
  DeeSharedModelPrivate *priv;
 
958
  GSList                *iter;
862
959
 
863
960
  g_return_if_fail (DEE_IS_SHARED_MODEL (self));
864
961
  g_return_if_fail (dee_peer_get_swarm_leader (self->priv->swarm) != NULL);
865
 
  g_return_if_fail (self->priv->connection != NULL);
866
962
  g_return_if_fail (self->priv->revision_queue == NULL);
867
963
  g_return_if_fail (dee_model_get_n_rows (DEE_MODEL (self)) == 0);
868
964
 
871
967
  trace_object (self, "Cloning leader '%s'",
872
968
                dee_shared_model_get_swarm_name (self));
873
969
 
874
 
  g_dbus_connection_call(priv->connection,
875
 
                         dee_shared_model_get_swarm_name (self), // name
876
 
                         priv->model_path,                       // obj path
877
 
                         "com.canonical.Dee.Model",              // iface
878
 
                         "Clone",                                // member
879
 
                         NULL,                                   // args
880
 
                         G_VARIANT_TYPE ("(sasaavauay(tt))"),    // ret type
881
 
                         G_DBUS_CALL_FLAGS_NONE,
882
 
                         -1,                                     // timeout
883
 
                         NULL,                                   // cancel
884
 
                         on_clone_received,                      // cb
885
 
                         g_object_ref (self));                   // userdata
 
970
  /* This shouldn't really happen when we have multiple connections, but let's
 
971
   * have it here for consistency */
 
972
  for (iter = priv->connections; iter != NULL; iter = iter->next)
 
973
    {
 
974
      g_dbus_connection_call((GDBusConnection*) iter->data,
 
975
                             dee_shared_model_get_swarm_name (self), // name
 
976
                             priv->model_path,                       // obj path
 
977
                             "com.canonical.Dee.Model",              // iface
 
978
                             "Clone",                                // member
 
979
                             NULL,                                   // args
 
980
                             G_VARIANT_TYPE ("(sasaavauay(tt))"),    // ret type
 
981
                             G_DBUS_CALL_FLAGS_NONE,
 
982
                             -1,                                     // timeout
 
983
                             NULL,                                   // cancel
 
984
                             on_clone_received,                      // cb
 
985
                             g_object_ref (self));                   // userdata
 
986
    }
886
987
}
887
988
 
888
989
static void
894
995
                         GVariant        *parameters,
895
996
                         gpointer         user_data)
896
997
{
 
998
  DeeSharedModel *model;
 
999
  const gchar    *unique_name;
 
1000
 
897
1001
  g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
898
1002
 
 
1003
  unique_name = g_dbus_connection_get_unique_name (connection);
 
1004
 
 
1005
  trace_object (user_data, "%s: sender: %s, our unique_name: %s",
 
1006
      __func__, sender_name, unique_name);
 
1007
 
899
1008
  /* Ignore signals from our selves. We may get those because of the way
900
1009
   * we set up the match rules */
901
 
  if (g_strcmp0 (sender_name,
 
1010
  if (unique_name != NULL && g_strcmp0 (sender_name,
902
1011
                 g_dbus_connection_get_unique_name (connection)) == 0)
903
1012
    return;
904
1013
 
905
 
 
906
1014
  if (g_strcmp0 (signal_name, "Commit") == 0)
907
1015
    {
908
 
      commit_transaction (DEE_SHARED_MODEL (user_data),
909
 
                          sender_name,
910
 
                          parameters);
 
1016
      model = DEE_SHARED_MODEL (user_data);
 
1017
      commit_transaction (model, sender_name, parameters);
 
1018
 
 
1019
      if (g_slist_length (model->priv->connections) > 1)
 
1020
      {
 
1021
        /* this is a server and a client (non-leader) just committed a change
 
1022
         * to the model, let's invalidate all other clients */
 
1023
        invalidate_peer (model, sender_name, connection);
 
1024
      }
911
1025
    }
912
1026
  else
913
1027
    g_warning ("Unexpected signal %s.%s from %s",
959
1073
  gint                   i, j;
960
1074
 
961
1075
  g_return_if_fail (DEE_IS_SHARED_MODEL (self));
962
 
  g_return_if_fail (sender_name != NULL);
963
1076
  g_return_if_fail (transaction != NULL);
964
1077
 
965
1078
  g_variant_ref_sink (transaction);
1036
1149
      if (dee_shared_model_is_leader (self))
1037
1150
        {
1038
1151
          g_warning ("Invalidating %s", sender_name);
1039
 
          invalidate_peer (self, sender_name);
 
1152
          invalidate_peer (self, sender_name, NULL);
1040
1153
        }
1041
1154
 
1042
1155
      g_variant_unref (transaction);
1245
1358
/* Call DBus method com.canonical.Dee.Model.Invalidate() on @sender_name */
1246
1359
static void
1247
1360
invalidate_peer (DeeSharedModel  *self,
1248
 
                 const gchar     *sender_name)
 
1361
                 const gchar     *sender_name,
 
1362
                 GDBusConnection *except)
1249
1363
{
1250
1364
  DeeSharedModelPrivate *priv;
 
1365
  GSList                *iter;
1251
1366
 
1252
1367
  g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1253
 
  g_return_if_fail (sender_name != NULL);
1254
1368
 
1255
1369
  if (!dee_shared_model_is_leader (self))
1256
1370
    {
1261
1375
 
1262
1376
  priv = self->priv;
1263
1377
 
1264
 
  g_dbus_connection_call (priv->connection,
1265
 
                          sender_name,
1266
 
                          priv->model_path,
1267
 
                          "com.canonical.Dee.Model",
1268
 
                          "Invalidate",
1269
 
                          NULL,                      /* params */
1270
 
                          NULL,                      /* reply type */
1271
 
                          G_DBUS_CALL_FLAGS_NONE,
1272
 
                          -1,                        /* timeout */
1273
 
                          NULL,                      /* cancel */
1274
 
                          NULL,                      /* cb */
1275
 
                          NULL);                     /* user data */
 
1378
  // invalidate peers on all connections
 
1379
  for (iter = priv->connections; iter != NULL; iter = iter->next)
 
1380
    {
 
1381
      if (iter->data == except) continue;
 
1382
      g_dbus_connection_call ((GDBusConnection*) iter->data,
 
1383
                              sender_name,
 
1384
                              priv->model_path,
 
1385
                              "com.canonical.Dee.Model",
 
1386
                              "Invalidate",
 
1387
                              NULL,                      /* params */
 
1388
                              NULL,                      /* reply type */
 
1389
                              G_DBUS_CALL_FLAGS_NONE,
 
1390
                              -1,                        /* timeout */
 
1391
                              NULL,                      /* cancel */
 
1392
                              NULL,                      /* cb */
 
1393
                              NULL);                     /* user data */
 
1394
    }
1276
1395
}
1277
1396
 
1278
1397
/* Public Methods */
1309
1428
}
1310
1429
 
1311
1430
/**
 
1431
 * dee_shared_model_new_for_peer:
 
1432
 * @peer: (transfer full): A #DeePeer instance.
 
1433
 *
 
1434
 * Create a new empty shared model without any column schema associated.
 
1435
 * The column schema will be set in one of two ways: firstly you may set it
 
1436
 * manually with dee_model_set_schema() or secondly it will be set once
 
1437
 * the first rows are exchanged with a peer model.
 
1438
 * 
 
1439
 * A #DeeSharedModel with a schema manually set has to be created before
 
1440
 * creating more #DeeSharedModel with the same @name.
 
1441
 *
 
1442
 * A shared model created with this constructor will store row data in a
 
1443
 * suitably picked memory backed model.
 
1444
 *
 
1445
 * Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
 
1446
 */
 
1447
DeeModel*
 
1448
dee_shared_model_new_for_peer (DeePeer *peer)
 
1449
{
 
1450
  DeeModel *self;
 
1451
  DeeModel *back_end;
 
1452
 
 
1453
  g_return_val_if_fail (peer != NULL, NULL);
 
1454
 
 
1455
  back_end = (DeeModel*) dee_sequence_model_new ();
 
1456
 
 
1457
  self = g_object_new (DEE_TYPE_SHARED_MODEL,
 
1458
                       "back-end", back_end,
 
1459
                       "peer", peer,
 
1460
                       NULL);
 
1461
 
 
1462
  g_object_unref (back_end);
 
1463
  g_object_unref (peer);
 
1464
 
 
1465
  return self;
 
1466
}
 
1467
 
 
1468
/**
1312
1469
 * dee_shared_model_new_with_back_end:
1313
1470
 * @name: (transfer none): A well known name to publish this model under.
1314
1471
 *        Models sharing this name will synchronize with each other
1318
1475
 *
1319
1476
 * Create a new shared model storing all data in @back_end.
1320
1477
 *
1321
 
 * In order to start synchronizing the new model with peer models you must call
1322
 
 * dee_shared_model_connect() on it.
 
1478
 * The model will start synchronizing with peer models as soon as possible and
 
1479
 * the #DeeSharedModel:synchronized property will be set once finished.
1323
1480
 *
1324
1481
 * Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1325
1482
 */
1460
1617
{
1461
1618
  DeeSharedModelPrivate *priv;
1462
1619
  GError                *error;
 
1620
  GSList                *iter;
1463
1621
  guint                  n_revisions;
1464
1622
 
1465
1623
  g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), 0);
1467
1625
  priv = self->priv;
1468
1626
  n_revisions = flush_revision_queue (DEE_MODEL (self));
1469
1627
 
1470
 
  error = NULL;
1471
 
  g_dbus_connection_flush_sync (priv->connection, NULL, &error);
1472
 
  if (error)
 
1628
  for (iter = priv->connections; iter != NULL; iter = iter->next)
1473
1629
    {
1474
 
      g_critical ("Error when flushing %u revisions of %s@%p: %s",
1475
 
                  n_revisions, G_OBJECT_TYPE_NAME (self), self, error->message);
1476
 
      g_error_free (error);
1477
 
      return 0;
 
1630
      error = NULL;
 
1631
      g_dbus_connection_flush_sync ((GDBusConnection*) iter->data, NULL, &error);
 
1632
      if (error)
 
1633
        {
 
1634
          g_critical ("Error when flushing %u revisions of %s@%p: %s",
 
1635
                      n_revisions, G_OBJECT_TYPE_NAME (self), self,
 
1636
                      error->message);
 
1637
          g_error_free (error);
 
1638
          return 0;
 
1639
        }
1478
1640
    }
1479
1641
 
1480
1642
  return n_revisions;