~ubuntu-branches/ubuntu/raring/tracker/raring

« back to all changes in this revision

Viewing changes to src/libtracker-miner/tracker-miner-fs.c

  • Committer: Package Import Robot
  • Author(s): Michael Biebl
  • Date: 2011-08-26 00:26:14 UTC
  • mfrom: (4.3.17 sid)
  • Revision ID: package-import@ubuntu.com-20110826002614-4qjfs9jhh5gs4p13
Tags: 0.10.24-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
#include "tracker-monitor.h"
32
32
#include "tracker-utils.h"
33
33
#include "tracker-thumbnailer.h"
34
 
#include "tracker-miner-fs-processing-pool.h"
 
34
#include "tracker-priority-queue.h"
 
35
#include "tracker-task-pool.h"
 
36
#include "tracker-sparql-buffer.h"
35
37
 
36
38
/* If defined will print the tree from GNode while running */
37
39
#ifdef CRAWLED_TREE_ENABLE_TRACE
98
100
/* Default processing pool limits to be set */
99
101
#define DEFAULT_WAIT_POOL_LIMIT 1
100
102
#define DEFAULT_READY_POOL_LIMIT 1
101
 
#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
102
103
 
103
104
/* Put tasks processing at a lower priority so other events
104
105
 * (timeouts, monitor events, etc...) are guaranteed to be
125
126
} ItemMovedData;
126
127
 
127
128
typedef struct {
 
129
        GFile     *file;
 
130
        GPtrArray *results;
 
131
        GStrv      rdf_types;
 
132
        GCancellable *cancellable;
 
133
        guint notified : 1;
 
134
} ItemWritebackData;
 
135
 
 
136
typedef struct {
128
137
        GFile    *file;
129
138
        guint     recurse : 1;
130
139
        guint     ref_count : 7;
131
140
} DirectoryData;
132
141
 
133
142
typedef struct {
 
143
        GFile *file;
134
144
        gchar *urn;
135
145
        gchar *parent_urn;
 
146
        gint priority;
136
147
        GCancellable *cancellable;
137
148
        TrackerSparqlBuilder *builder;
138
149
        TrackerMiner *miner;
169
180
        TrackerMonitor *monitor;
170
181
        TrackerCrawler *crawler;
171
182
 
172
 
        GQueue         *crawled_directories;
 
183
        TrackerPriorityQueue *crawled_directories;
173
184
 
174
185
        /* File queues for indexer */
175
 
        GQueue         *items_created;
176
 
        GQueue         *items_updated;
177
 
        GQueue         *items_deleted;
178
 
        GQueue         *items_moved;
 
186
        TrackerPriorityQueue *items_created;
 
187
        TrackerPriorityQueue *items_updated;
 
188
        TrackerPriorityQueue *items_deleted;
 
189
        TrackerPriorityQueue *items_moved;
 
190
        TrackerPriorityQueue *items_writeback;
179
191
#ifdef EVENT_QUEUE_ENABLE_TRACE
180
192
        guint           queue_status_timeout_id;
181
193
#endif /* EVENT_QUEUE_ENABLE_TRACE */
184
196
 
185
197
        GQuark          quark_ignore_file;
186
198
        GQuark          quark_attribute_updated;
187
 
        GQuark          quark_check_existence;
 
199
        GQuark          quark_directory_found_crawling;
188
200
 
189
201
        GList          *config_directories;
190
202
 
191
 
        GList          *directories;
 
203
        TrackerPriorityQueue *directories;
192
204
        DirectoryData  *current_directory;
193
205
 
194
206
        GTimer         *timer;
198
210
 
199
211
        gdouble         throttle;
200
212
 
201
 
        TrackerProcessingPool *processing_pool;
 
213
        /* Extraction tasks */
 
214
        TrackerTaskPool *task_pool;
 
215
        GList *extraction_tasks;
 
216
 
 
217
        /* Writeback tasks */
 
218
        TrackerTaskPool *writeback_pool;
 
219
 
 
220
        /* Sparql insertion tasks */
 
221
        TrackerSparqlBuffer *sparql_buffer;
 
222
        guint sparql_buffer_limit;
202
223
 
203
224
        /* URI mtime cache */
204
225
        GFile          *current_mtime_cache_parent;
250
271
        QUEUE_DELETED,
251
272
        QUEUE_MOVED,
252
273
        QUEUE_IGNORE_NEXT_UPDATE,
253
 
        QUEUE_WAIT
 
274
        QUEUE_WAIT,
 
275
        QUEUE_WRITEBACK
254
276
} QueueState;
255
277
 
256
278
enum {
262
284
        PROCESS_FILE_ATTRIBUTES,
263
285
        IGNORE_NEXT_UPDATE_FILE,
264
286
        FINISHED,
 
287
        WRITEBACK_FILE,
265
288
        LAST_SIGNAL
266
289
};
267
290
 
270
293
        PROP_THROTTLE,
271
294
        PROP_WAIT_POOL_LIMIT,
272
295
        PROP_READY_POOL_LIMIT,
273
 
        PROP_N_REQUESTS_POOL_LIMIT,
274
296
        PROP_MTIME_CHECKING,
275
297
        PROP_INITIAL_CRAWLING
276
298
};
277
299
 
 
300
static void           miner_fs_initable_iface_init        (GInitableIface       *iface);
 
301
 
278
302
static void           fs_finalize                         (GObject              *object);
279
303
static void           fs_set_property                     (GObject              *object,
280
304
                                                           guint                 prop_id,
302
326
static ItemMovedData *item_moved_data_new                 (GFile                *file,
303
327
                                                           GFile                *source_file);
304
328
static void           item_moved_data_free                (ItemMovedData        *data);
 
329
static void           item_writeback_data_free            (ItemWritebackData    *data);
305
330
static void           monitor_item_created_cb             (TrackerMonitor       *monitor,
306
331
                                                           GFile                *file,
307
332
                                                           gboolean              is_directory,
357
382
static gboolean       should_recurse_for_directory            (TrackerMinerFS *fs,
358
383
                                                               GFile          *file);
359
384
static void           tracker_miner_fs_directory_add_internal (TrackerMinerFS *fs,
360
 
                                                               GFile          *file);
 
385
                                                               GFile          *file,
 
386
                                                               gint            priority);
361
387
static gboolean       miner_fs_has_children_without_parent (TrackerMinerFS *fs,
362
388
                                                            GFile          *file);
363
389
 
364
 
static void           processing_pool_cancel_foreach          (gpointer        data,
 
390
static void           task_pool_cancel_foreach                (gpointer        data,
365
391
                                                               gpointer        user_data);
366
392
 
367
393
 
 
394
static GInitableIface* miner_fs_initable_parent_iface;
368
395
static guint signals[LAST_SIGNAL] = { 0, };
369
396
 
370
 
G_DEFINE_ABSTRACT_TYPE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER)
 
397
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER,
 
398
                                  G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
 
399
                                                         miner_fs_initable_iface_init));
371
400
 
372
401
static void
373
402
tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
415
444
                                                            1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
416
445
                                                            G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
417
446
        g_object_class_install_property (object_class,
418
 
                                         PROP_N_REQUESTS_POOL_LIMIT,
419
 
                                         g_param_spec_uint ("processing-pool-requests-limit",
420
 
                                                            "Processing pool limit for number of requests",
421
 
                                                            "Maximum number of SPARQL requests that can be sent "
422
 
                                                            "to the store in parallel.",
423
 
                                                            1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
424
 
                                                            G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
425
 
        g_object_class_install_property (object_class,
426
447
                                         PROP_MTIME_CHECKING,
427
448
                                         g_param_spec_boolean ("mtime-checking",
428
449
                                                               "Mtime checking",
616
637
         *          %FALSE on failure
617
638
         *
618
639
         * Since: 0.8
 
640
         * Deprecated since: 0.12
619
641
         **/
620
642
        signals[IGNORE_NEXT_UPDATE_FILE] =
621
643
                g_signal_new ("ignore-next-update-file",
656
678
                              G_TYPE_UINT,
657
679
                              G_TYPE_UINT);
658
680
 
 
681
        /**
 
682
         * TrackerMinerFS::writeback-file:
 
683
         * @miner_fs: the #TrackerMinerFS
 
684
         * @file: a #GFile
 
685
         * @rdf_types: the set of RDF types
 
686
         * @results: a set of results prepared by the preparation query
 
687
         * @cancellable: a #GCancellable
 
688
         *
 
689
         * The ::writeback-file signal is emitted whenever a file must be written
 
690
         * back
 
691
         *
 
692
         * Returns: %TRUE on success, %FALSE otherwise
 
693
         *
 
694
         * Since: 0.10.20
 
695
         **/
 
696
        signals[WRITEBACK_FILE] =
 
697
                g_signal_new ("writeback-file",
 
698
                              G_OBJECT_CLASS_TYPE (object_class),
 
699
                              G_SIGNAL_RUN_LAST,
 
700
                              G_STRUCT_OFFSET (TrackerMinerFSClass, writeback_file),
 
701
                              NULL,
 
702
                              NULL,
 
703
                              tracker_marshal_BOOLEAN__OBJECT_BOXED_BOXED_OBJECT,
 
704
                              G_TYPE_BOOLEAN,
 
705
                              4,
 
706
                              G_TYPE_FILE,
 
707
                              G_TYPE_STRV,
 
708
                              G_TYPE_PTR_ARRAY,
 
709
                              G_TYPE_CANCELLABLE);
 
710
 
659
711
        g_type_class_add_private (object_class, sizeof (TrackerMinerFSPrivate));
660
712
}
661
713
 
668
720
 
669
721
        priv = object->priv;
670
722
 
671
 
        priv->crawled_directories = g_queue_new ();
672
 
        priv->items_created = g_queue_new ();
673
 
        priv->items_updated = g_queue_new ();
674
 
        priv->items_deleted = g_queue_new ();
675
 
        priv->items_moved = g_queue_new ();
 
723
        priv->crawled_directories = tracker_priority_queue_new ();
 
724
        priv->items_created = tracker_priority_queue_new ();
 
725
        priv->items_updated = tracker_priority_queue_new ();
 
726
        priv->items_deleted = tracker_priority_queue_new ();
 
727
        priv->items_moved = tracker_priority_queue_new ();
 
728
        priv->items_writeback = tracker_priority_queue_new ();
 
729
 
 
730
        priv->directories = tracker_priority_queue_new ();
676
731
 
677
732
#ifdef EVENT_QUEUE_ENABLE_TRACE
678
733
        priv->queue_status_timeout_id = g_timeout_add_seconds (EVENT_QUEUE_STATUS_TIMEOUT_SECS,
684
739
                                                                (GDestroyNotify) g_free,
685
740
                                                                (GDestroyNotify) NULL);
686
741
 
687
 
        /* Create processing pool */
688
 
        priv->processing_pool = tracker_processing_pool_new (object,
689
 
                                                             DEFAULT_WAIT_POOL_LIMIT,
690
 
                                                             DEFAULT_READY_POOL_LIMIT,
691
 
                                                             DEFAULT_N_REQUESTS_POOL_LIMIT);
 
742
        /* Create processing pools */
 
743
        priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
 
744
        priv->writeback_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
692
745
 
693
746
        /* Set up the crawlers now we have config and hal */
694
747
        priv->crawler = tracker_crawler_new ();
729
782
                          object);
730
783
 
731
784
        priv->quark_ignore_file = g_quark_from_static_string ("tracker-ignore-file");
732
 
        priv->quark_check_existence = g_quark_from_static_string ("tracker-check-existence");
 
785
        priv->quark_directory_found_crawling = g_quark_from_static_string ("tracker-directory-found-crawling");
733
786
        priv->quark_attribute_updated = g_quark_from_static_string ("tracker-attribute-updated");
734
787
 
735
788
        priv->iri_cache = g_hash_table_new_full (g_file_hash,
747
800
        priv->dirs_without_parent = NULL;
748
801
}
749
802
 
 
803
static gboolean
 
804
miner_fs_initable_init (GInitable     *initable,
 
805
                        GCancellable  *cancellable,
 
806
                        GError       **error)
 
807
{
 
808
        TrackerMinerFSPrivate *priv;
 
809
        guint limit;
 
810
 
 
811
        if (!miner_fs_initable_parent_iface->init (initable, cancellable, error)) {
 
812
                return FALSE;
 
813
        }
 
814
 
 
815
        priv = TRACKER_MINER_FS_GET_PRIVATE (initable);
 
816
 
 
817
        g_object_get (initable, "processing-pool-ready-limit", &limit, NULL);
 
818
        priv->sparql_buffer = tracker_sparql_buffer_new (tracker_miner_get_connection (TRACKER_MINER (initable)),
 
819
                                                         limit);
 
820
        return TRUE;
 
821
}
 
822
 
 
823
static void
 
824
miner_fs_initable_iface_init (GInitableIface *iface)
 
825
{
 
826
        miner_fs_initable_parent_iface = g_type_interface_peek_parent (iface);
 
827
        iface->init = miner_fs_initable_init;
 
828
}
 
829
 
750
830
static void
751
831
fs_finalize (GObject *object)
752
832
{
774
854
        if (priv->current_mtime_cache_parent)
775
855
                g_object_unref (priv->current_mtime_cache_parent);
776
856
 
777
 
        if (priv->directories) {
778
 
                g_list_foreach (priv->directories, (GFunc) directory_data_unref, NULL);
779
 
                g_list_free (priv->directories);
780
 
        }
 
857
        tracker_priority_queue_foreach (priv->directories,
 
858
                                        (GFunc) directory_data_unref,
 
859
                                        NULL);
 
860
        tracker_priority_queue_unref (priv->directories);
781
861
 
782
862
        if (priv->config_directories) {
783
863
                g_list_foreach (priv->config_directories, (GFunc) directory_data_unref, NULL);
784
864
                g_list_free (priv->config_directories);
785
865
        }
786
866
 
787
 
        g_queue_foreach (priv->crawled_directories, (GFunc) crawled_directory_data_free, NULL);
788
 
        g_queue_free (priv->crawled_directories);
 
867
        tracker_priority_queue_foreach (priv->crawled_directories,
 
868
                                        (GFunc) crawled_directory_data_free,
 
869
                                        NULL);
 
870
        tracker_priority_queue_unref (priv->crawled_directories);
789
871
 
790
872
        /* Cancel every pending task */
791
 
        tracker_processing_pool_foreach (priv->processing_pool,
792
 
                                         processing_pool_cancel_foreach,
793
 
                                         NULL);
794
 
        tracker_processing_pool_free (priv->processing_pool);
795
 
 
796
 
        g_queue_foreach (priv->items_moved, (GFunc) item_moved_data_free, NULL);
797
 
        g_queue_free (priv->items_moved);
798
 
 
799
 
        g_queue_foreach (priv->items_deleted, (GFunc) g_object_unref, NULL);
800
 
        g_queue_free (priv->items_deleted);
801
 
 
802
 
        g_queue_foreach (priv->items_updated, (GFunc) g_object_unref, NULL);
803
 
        g_queue_free (priv->items_updated);
804
 
 
805
 
        g_queue_foreach (priv->items_created, (GFunc) g_object_unref, NULL);
806
 
        g_queue_free (priv->items_created);
 
873
        tracker_task_pool_foreach (priv->task_pool,
 
874
                                   task_pool_cancel_foreach,
 
875
                                   NULL);
 
876
        g_object_unref (priv->task_pool);
 
877
        g_list_free (priv->extraction_tasks);
 
878
 
 
879
        g_object_unref (priv->writeback_pool);
 
880
 
 
881
        if (priv->sparql_buffer) {
 
882
                g_object_unref (priv->sparql_buffer);
 
883
        }
 
884
 
 
885
        tracker_priority_queue_foreach (priv->items_moved,
 
886
                                        (GFunc) item_moved_data_free,
 
887
                                        NULL);
 
888
        tracker_priority_queue_unref (priv->items_moved);
 
889
 
 
890
        tracker_priority_queue_foreach (priv->items_deleted,
 
891
                                        (GFunc) g_object_unref,
 
892
                                        NULL);
 
893
        tracker_priority_queue_unref (priv->items_deleted);
 
894
 
 
895
        tracker_priority_queue_foreach (priv->items_updated,
 
896
                                        (GFunc) g_object_unref,
 
897
                                        NULL);
 
898
        tracker_priority_queue_unref (priv->items_updated);
 
899
 
 
900
        tracker_priority_queue_foreach (priv->items_created,
 
901
                                        (GFunc) g_object_unref,
 
902
                                        NULL);
 
903
        tracker_priority_queue_unref (priv->items_created);
 
904
 
 
905
        tracker_priority_queue_foreach (priv->items_writeback,
 
906
                                        (GFunc) item_writeback_data_free,
 
907
                                        NULL);
 
908
        tracker_priority_queue_unref (priv->items_writeback);
807
909
 
808
910
        g_list_foreach (priv->dirs_without_parent, (GFunc) g_object_unref, NULL);
809
911
        g_list_free (priv->dirs_without_parent);
844
946
                                               g_value_get_double (value));
845
947
                break;
846
948
        case PROP_WAIT_POOL_LIMIT:
847
 
                tracker_processing_pool_set_wait_limit (fs->priv->processing_pool,
848
 
                                                        g_value_get_uint (value));
 
949
                tracker_task_pool_set_limit (fs->priv->task_pool,
 
950
                                             g_value_get_uint (value));
849
951
                break;
850
952
        case PROP_READY_POOL_LIMIT:
851
 
                tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
852
 
                                                         g_value_get_uint (value));
853
 
                break;
854
 
        case PROP_N_REQUESTS_POOL_LIMIT:
855
 
                tracker_processing_pool_set_n_requests_limit (fs->priv->processing_pool,
856
 
                                                              g_value_get_uint (value));
 
953
                fs->priv->sparql_buffer_limit = g_value_get_uint (value);
 
954
 
 
955
                if (fs->priv->sparql_buffer) {
 
956
                        tracker_task_pool_set_limit (TRACKER_TASK_POOL (fs->priv->sparql_buffer),
 
957
                                                     fs->priv->sparql_buffer_limit);
 
958
                }
857
959
                break;
858
960
        case PROP_MTIME_CHECKING:
859
961
                fs->priv->mtime_checking = g_value_get_boolean (value);
883
985
                break;
884
986
        case PROP_WAIT_POOL_LIMIT:
885
987
                g_value_set_uint (value,
886
 
                                  tracker_processing_pool_get_wait_limit (fs->priv->processing_pool));
 
988
                                  tracker_task_pool_get_limit (fs->priv->task_pool));
887
989
                break;
888
990
        case PROP_READY_POOL_LIMIT:
889
 
                g_value_set_uint (value,
890
 
                                  tracker_processing_pool_get_ready_limit (fs->priv->processing_pool));
891
 
                break;
892
 
        case PROP_N_REQUESTS_POOL_LIMIT:
893
 
                g_value_set_uint (value,
894
 
                                  tracker_processing_pool_get_n_requests_limit (fs->priv->processing_pool));
 
991
                g_value_set_uint (value, fs->priv->sparql_buffer_limit);
895
992
                break;
896
993
        case PROP_MTIME_CHECKING:
897
994
                g_value_set_boolean (value, fs->priv->mtime_checking);
981
1078
        /* Only set up queue handler if we have items waiting to be
982
1079
         * processed.
983
1080
         */
984
 
        if (g_queue_get_length (fs->priv->items_deleted) > 0 ||
985
 
            g_queue_get_length (fs->priv->items_created) > 0 ||
986
 
            g_queue_get_length (fs->priv->items_updated) > 0 ||
987
 
            g_queue_get_length (fs->priv->items_moved) > 0) {
 
1081
        if (tracker_miner_fs_has_items_to_process (fs)) {
988
1082
                item_queue_handlers_set_up (fs);
989
1083
        }
990
1084
}
1126
1220
        g_slice_free (ItemMovedData, data);
1127
1221
}
1128
1222
 
1129
 
static void
1130
 
processing_pool_task_finished_cb (TrackerProcessingTask *task,
1131
 
                                  gpointer               user_data,
1132
 
                                  const GError          *error)
 
1223
static ItemWritebackData *
 
1224
item_writeback_data_new (GFile     *file,
 
1225
                         GStrv      rdf_types,
 
1226
                         GPtrArray *results)
 
1227
{
 
1228
        ItemWritebackData *data;
 
1229
 
 
1230
        data = g_slice_new (ItemWritebackData);
 
1231
 
 
1232
        data->file = g_object_ref (file);
 
1233
        data->results = g_ptr_array_ref (results);
 
1234
        data->rdf_types = g_strdupv (rdf_types);
 
1235
        data->cancellable = g_cancellable_new ();
 
1236
        data->notified = FALSE;
 
1237
 
 
1238
        return data;
 
1239
}
 
1240
 
 
1241
static void
 
1242
item_writeback_data_free (ItemWritebackData *data)
 
1243
{
 
1244
        g_object_unref (data->file);
 
1245
        g_ptr_array_unref (data->results);
 
1246
        g_strfreev (data->rdf_types);
 
1247
        g_object_unref (data->cancellable);
 
1248
        g_slice_free (ItemWritebackData, data);
 
1249
}
 
1250
 
 
1251
static void
 
1252
sparql_buffer_task_finished_cb (GObject      *object,
 
1253
                                GAsyncResult *result,
 
1254
                                gpointer      user_data)
1133
1255
{
1134
1256
        TrackerMinerFS *fs;
1135
1257
        TrackerMinerFSPrivate *priv;
 
1258
        TrackerTask *task;
 
1259
        GError *error = NULL;
1136
1260
 
1137
1261
        fs = user_data;
1138
1262
        priv = fs->priv;
1139
1263
 
1140
 
        if (error) {
 
1264
        if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (result),
 
1265
                                                   &error)) {
1141
1266
                g_critical ("Could not execute sparql: %s", error->message);
1142
1267
                priv->total_files_notified_error++;
1143
 
        } else {
1144
 
                if (fs->priv->current_iri_cache_parent) {
1145
 
                        GFile *parent;
1146
 
                        GFile *task_file;
1147
 
 
1148
 
                        task_file = tracker_processing_task_get_file (task);
1149
 
 
1150
 
                        /* Note: parent may be NULL if the file represents
1151
 
                         * the root directory of the file system (applies to
1152
 
                         * .gvfs mounts also!) */
1153
 
                        parent = g_file_get_parent (task_file);
1154
 
 
1155
 
                        if (parent) {
1156
 
                                if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
1157
 
                                    g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
1158
 
                                        /* Item is processed, add an empty element for the processed GFile,
1159
 
                                         * in case it is again processed before the cache expires
1160
 
                                         */
1161
 
                                        g_hash_table_insert (fs->priv->iri_cache,
1162
 
                                                             g_object_ref (task_file),
1163
 
                                                             NULL);
1164
 
                                }
1165
 
 
1166
 
                                g_object_unref (parent);
 
1268
                g_error_free (error);
 
1269
        } else if (fs->priv->current_iri_cache_parent) {
 
1270
                GFile *parent;
 
1271
                GFile *task_file;
 
1272
 
 
1273
                task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
 
1274
                task_file = tracker_task_get_file (task);
 
1275
 
 
1276
                /* Note: parent may be NULL if the file represents
 
1277
                 * the root directory of the file system (applies to
 
1278
                 * .gvfs mounts also!) */
 
1279
                parent = g_file_get_parent (task_file);
 
1280
 
 
1281
                if (parent) {
 
1282
                        if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
 
1283
                            g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
 
1284
                                /* Item is processed, add an empty element for the processed GFile,
 
1285
                                 * in case it is again processed before the cache expires
 
1286
                                 */
 
1287
                                g_hash_table_insert (fs->priv->iri_cache,
 
1288
                                                     g_object_ref (task_file),
 
1289
                                                     NULL);
1167
1290
                        }
 
1291
 
 
1292
                        g_object_unref (parent);
1168
1293
                }
1169
1294
        }
1170
1295
 
1358
1483
        return FALSE;
1359
1484
}
1360
1485
 
 
1486
static gboolean
 
1487
directory_contains_file (DirectoryData *dir_data,
 
1488
                         GFile         *file)
 
1489
{
 
1490
 
 
1491
        if (g_file_equal (file, dir_data->file) ||
 
1492
            (dir_data->recurse &&
 
1493
             g_file_has_prefix (file, dir_data->file))) {
 
1494
                return TRUE;
 
1495
        }
 
1496
 
 
1497
        return FALSE;
 
1498
}
 
1499
 
1361
1500
static DirectoryData *
1362
1501
find_config_directory (TrackerMinerFS *fs,
1363
1502
                       GFile          *file)
1372
1511
                data = dirs->data;
1373
1512
                dirs = dirs->next;
1374
1513
 
1375
 
                if (g_file_equal (data->file, file) ||
1376
 
                    (data->recurse && (g_file_has_prefix (file, data->file)))) {
 
1514
                if (directory_contains_file (data, file)) {
1377
1515
                        return data;
1378
1516
                }
1379
1517
        }
1406
1544
 
1407
1545
        g_debug ("Generating children cache for URI '%s'", uri);
1408
1546
 
1409
 
        query = g_strdup_printf ("SELECT ?url ?u { "
1410
 
                                 "  ?u nfo:belongsToContainer ?p ; "
1411
 
                                 "     nie:url ?url . "
1412
 
                                 "  ?p nie:url \"%s\" "
1413
 
                                 "}",
1414
 
                                 uri);
 
1547
        if (fs->priv->current_iri_cache_parent_urn) {
 
1548
                query = g_strdup_printf ("SELECT ?url ?u { "
 
1549
                                         "  ?u nfo:belongsToContainer <%s> ; "
 
1550
                                         "     nie:url ?url "
 
1551
                                         "}",
 
1552
                                         fs->priv->current_iri_cache_parent_urn);
 
1553
        } else {
 
1554
                query = g_strdup_printf ("SELECT ?url ?u { "
 
1555
                                         "  ?u nfo:belongsToContainer ?p ; "
 
1556
                                         "     nie:url ?url . "
 
1557
                                         "  ?p nie:url \"%s\" "
 
1558
                                         "}",
 
1559
                                         uri);
 
1560
        }
1415
1561
 
1416
1562
        data.main_loop = g_main_loop_new (NULL, FALSE);
1417
1563
        data.values = g_hash_table_ref (fs->priv->iri_cache);
1553
1699
        g_hash_table_remove (fs->priv->iri_cache, file);
1554
1700
}
1555
1701
 
 
1702
static void
 
1703
iri_cache_check_update (TrackerMinerFS *fs,
 
1704
                        GFile          *file)
 
1705
{
 
1706
        GFile *parent;
 
1707
 
 
1708
        parent = g_file_get_parent (file);
 
1709
 
 
1710
        if (parent) {
 
1711
                if (!fs->priv->current_iri_cache_parent ||
 
1712
                    !g_file_equal (parent, fs->priv->current_iri_cache_parent)) {
 
1713
                        /* Cache the URN for the new current parent, processing
 
1714
                         * order guarantees that all contents for a folder are
 
1715
                         * inspected together, and that the parent folder info
 
1716
                         * is already in tracker-store. So this should only
 
1717
                         * happen on folder switch.
 
1718
                         */
 
1719
                        if (fs->priv->current_iri_cache_parent)
 
1720
                                g_object_unref (fs->priv->current_iri_cache_parent);
 
1721
 
 
1722
                        g_free (fs->priv->current_iri_cache_parent_urn);
 
1723
 
 
1724
                        fs->priv->current_iri_cache_parent = g_object_ref (parent);
 
1725
 
 
1726
                        if (!item_query_exists (fs,
 
1727
                                                parent,
 
1728
                                                FALSE,
 
1729
                                                &fs->priv->current_iri_cache_parent_urn,
 
1730
                                                NULL)) {
 
1731
                                fs->priv->current_iri_cache_parent_urn = NULL;
 
1732
                        }
 
1733
 
 
1734
                        ensure_iri_cache (fs, file);
 
1735
                }
 
1736
 
 
1737
                g_object_unref (parent);
 
1738
        }
 
1739
}
 
1740
 
1556
1741
static UpdateProcessingTaskContext *
1557
1742
update_processing_task_context_new (TrackerMiner         *miner,
 
1743
                                    gint                  priority,
1558
1744
                                    const gchar          *urn,
1559
1745
                                    const gchar          *parent_urn,
1560
1746
                                    GCancellable         *cancellable,
1566
1752
        ctxt->miner = miner;
1567
1753
        ctxt->urn = g_strdup (urn);
1568
1754
        ctxt->parent_urn = g_strdup (parent_urn);
 
1755
        ctxt->priority = priority;
1569
1756
 
1570
1757
        if (cancellable) {
1571
1758
                ctxt->cancellable = g_object_ref (cancellable);
1596
1783
}
1597
1784
 
1598
1785
static gboolean
1599
 
do_process_file (TrackerMinerFS        *fs,
1600
 
                 TrackerProcessingTask *task)
 
1786
do_process_file (TrackerMinerFS *fs,
 
1787
                 TrackerTask    *task)
1601
1788
{
1602
1789
        TrackerMinerFSPrivate *priv;
1603
1790
        gboolean processing;
1606
1793
        GFile *task_file;
1607
1794
        UpdateProcessingTaskContext *ctxt;
1608
1795
 
1609
 
        ctxt = tracker_processing_task_get_context (task);
1610
 
        task_file = tracker_processing_task_get_file (task);
 
1796
        ctxt = tracker_task_get_data (task);
 
1797
        task_file = tracker_task_get_file (task);
1611
1798
        uri = g_file_get_uri (task_file);
1612
1799
        priv = fs->priv;
1613
1800
 
1634
1821
                /* Re-fetch data, since it might have been
1635
1822
                 * removed in broken implementations
1636
1823
                 */
1637
 
                task = tracker_processing_pool_find_task (priv->processing_pool, task_file, FALSE);
 
1824
                task = tracker_task_pool_find (priv->task_pool, task_file);
1638
1825
 
1639
1826
                g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
1640
1827
 
1644
1831
                                    "tracker_miner_fs_file_notify(), this is an "
1645
1832
                                    "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
1646
1833
                } else {
1647
 
                        tracker_processing_pool_remove_task (priv->processing_pool, task);
1648
 
                        tracker_processing_task_free (task);
 
1834
                        tracker_task_pool_remove (priv->task_pool, task);
 
1835
                        priv->extraction_tasks = g_list_remove (priv->extraction_tasks, task);
 
1836
                        tracker_task_unref (task);
1649
1837
                }
1650
1838
        }
1651
1839
 
1655
1843
}
1656
1844
 
1657
1845
static void
1658
 
item_add_or_update_cb (TrackerMinerFS        *fs,
1659
 
                       TrackerProcessingTask *task,
1660
 
                       const GError          *error)
 
1846
item_add_or_update_cb (TrackerMinerFS *fs,
 
1847
                       TrackerTask    *extraction_task,
 
1848
                       const GError   *error)
1661
1849
{
1662
1850
        UpdateProcessingTaskContext *ctxt;
 
1851
        TrackerTask *sparql_task;
1663
1852
        GFile *task_file;
1664
1853
        gchar *uri;
1665
1854
 
1666
 
        ctxt = tracker_processing_task_get_context (task);
1667
 
        task_file = tracker_processing_task_get_file (task);
 
1855
        ctxt = tracker_task_get_data (extraction_task);
 
1856
        task_file = tracker_task_get_file (extraction_task);
1668
1857
        uri = g_file_get_uri (task_file);
1669
1858
 
1670
1859
        if (error) {
1671
 
                TrackerProcessingTask *first_item_task;
1672
 
 
1673
 
                first_item_task = tracker_processing_pool_get_last_wait (fs->priv->processing_pool);
 
1860
                TrackerTask *first_item_task = NULL;
 
1861
                GList *first_task;
 
1862
 
 
1863
                first_task = g_list_last (fs->priv->extraction_tasks);
 
1864
 
 
1865
                if (first_task) {
 
1866
                        first_item_task = first_task->data;
 
1867
                }
1674
1868
 
1675
1869
                /* Perhaps this is too specific to TrackerMinerFiles, if the extractor
1676
1870
                 * is choking on some file, the miner will get a timeout for all files
1678
1872
                 * is the first one that was added to the processing pool, so we retry
1679
1873
                 * the others.
1680
1874
                 */
1681
 
                if (task != first_item_task &&
 
1875
                if (extraction_task != first_item_task &&
1682
1876
                    (error->code == G_DBUS_ERROR_NO_REPLY ||
1683
1877
                     error->code == G_DBUS_ERROR_TIMEOUT ||
1684
1878
                     error->code == G_DBUS_ERROR_TIMED_OUT)) {
1688
1882
                        g_object_unref (ctxt->builder);
1689
1883
                        ctxt->builder = tracker_sparql_builder_new_update ();
1690
1884
 
1691
 
                        do_process_file (fs, task);
 
1885
                        do_process_file (fs, extraction_task);
1692
1886
                        g_free (uri);
1693
1887
 
1694
1888
                        return;
1699
1893
 
1700
1894
                        if (error->code == G_IO_ERROR_CANCELLED) {
1701
1895
                                /* Cancelled is cancelled, just move along in this case */
1702
 
                                tracker_processing_pool_remove_task (fs->priv->processing_pool, task);
1703
 
                                tracker_processing_task_free (task);
 
1896
                                tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
 
1897
                                fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
 
1898
                                                                            extraction_task);
 
1899
                                tracker_task_unref (extraction_task);
1704
1900
 
1705
1901
                                item_queue_handlers_set_up (fs);
1706
1902
                                return;
1708
1904
                }
1709
1905
        }
1710
1906
 
 
1907
        tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
 
1908
        fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
 
1909
                                                    extraction_task);
 
1910
 
1711
1911
        if (ctxt->urn) {
1712
1912
                gboolean attribute_update_only;
1713
1913
 
1753
1953
                                                       ctxt->urn, ctxt->urn,
1754
1954
                                                       tracker_sparql_builder_get_result (ctxt->builder));
1755
1955
 
1756
 
                        /* Note that set_sparql_string() takes ownership of the passed string */
1757
 
                        tracker_processing_task_set_sparql_string (task, full_sparql);
 
1956
                        sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
1758
1957
                } else {
1759
1958
                        /* Do not drop graph if only updating attributes, the SPARQL builder
1760
1959
                         * will already contain the necessary DELETE statements for the properties
1761
1960
                         * being updated */
1762
 
                        tracker_processing_task_set_sparql (task, ctxt->builder);
 
1961
                        sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
1763
1962
                }
1764
1963
        } else {
1765
1964
                if (error != NULL) {
1770
1969
                        g_debug ("Creating new item '%s'", uri);
1771
1970
                }
1772
1971
 
1773
 
                tracker_processing_task_set_sparql (task, ctxt->builder);
 
1972
                sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
1774
1973
        }
1775
1974
 
1776
 
        /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
1777
 
         * and in this case we need to setup queue handlers again */
1778
 
        if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
1779
 
                                                      task,
1780
 
                                                      processing_pool_task_finished_cb,
1781
 
                                                      fs)) {
 
1975
        tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 
1976
                                    sparql_task,
 
1977
                                    ctxt->priority,
 
1978
                                    sparql_buffer_task_finished_cb,
 
1979
                                    fs);
 
1980
 
 
1981
        if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
1782
1982
                item_queue_handlers_set_up (fs);
1783
1983
        }
1784
1984
 
 
1985
        tracker_task_unref (extraction_task);
 
1986
 
1785
1987
        g_free (uri);
1786
1988
}
1787
1989
 
1788
1990
static gboolean
1789
1991
item_add_or_update (TrackerMinerFS *fs,
1790
 
                    GFile          *file)
 
1992
                    GFile          *file,
 
1993
                    gint            priority)
1791
1994
{
1792
1995
        TrackerMinerFSPrivate *priv;
1793
1996
        TrackerSparqlBuilder *sparql;
1794
1997
        GCancellable *cancellable;
1795
1998
        gboolean retval;
1796
 
        TrackerProcessingTask *task;
1797
 
        GFile *parent;
 
1999
        TrackerTask *task;
1798
2000
        const gchar *urn;
1799
 
        const gchar *parent_urn = NULL;
1800
2001
        UpdateProcessingTaskContext *ctxt;
1801
2002
 
1802
2003
        priv = fs->priv;
1806
2007
        sparql = tracker_sparql_builder_new_update ();
1807
2008
        g_object_ref (file);
1808
2009
 
1809
 
        parent = g_file_get_parent (file);
1810
 
 
1811
 
        if (parent) {
1812
 
                if (!fs->priv->current_iri_cache_parent ||
1813
 
                    !g_file_equal (parent, fs->priv->current_iri_cache_parent)) {
1814
 
                        /* Cache the URN for the new current parent, processing
1815
 
                         * order guarantees that all contents for a folder are
1816
 
                         * inspected together, and that the parent folder info
1817
 
                         * is already in tracker-store. So this should only
1818
 
                         * happen on folder switch.
1819
 
                         */
1820
 
                        if (fs->priv->current_iri_cache_parent)
1821
 
                                g_object_unref (fs->priv->current_iri_cache_parent);
1822
 
 
1823
 
                        g_free (fs->priv->current_iri_cache_parent_urn);
1824
 
 
1825
 
                        fs->priv->current_iri_cache_parent = g_object_ref (parent);
1826
 
 
1827
 
                        if (!item_query_exists (fs,
1828
 
                                                parent,
1829
 
                                                FALSE,
1830
 
                                                &fs->priv->current_iri_cache_parent_urn,
1831
 
                                                NULL)) {
1832
 
                                fs->priv->current_iri_cache_parent_urn = NULL;
1833
 
                        }
1834
 
 
1835
 
                        ensure_iri_cache (fs, file);
1836
 
                }
1837
 
 
1838
 
                parent_urn = fs->priv->current_iri_cache_parent_urn;
1839
 
                g_object_unref (parent);
1840
 
        }
1841
 
 
1842
2010
        /* Force a direct URN query if not found in the cache. This is to handle
1843
2011
         * situations where an application inserted items in the store after we
1844
2012
         * updated the cache, or without a proper nfo:belongsToContainer */
1846
2014
 
1847
2015
        /* Create task and add it to the pool as a WAIT task (we need to extract
1848
2016
         * the file metadata and such) */
1849
 
        task = tracker_processing_task_new (file);
1850
2017
        ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
 
2018
                                                   priority,
1851
2019
                                                   urn,
1852
 
                                                   parent_urn,
 
2020
                                                   fs->priv->current_iri_cache_parent_urn,
1853
2021
                                                   cancellable,
1854
2022
                                                   sparql);
1855
 
        tracker_processing_task_set_context (task,
1856
 
                                             ctxt,
1857
 
                                             (GFreeFunc) update_processing_task_context_free);
1858
 
        tracker_processing_pool_push_wait_task (priv->processing_pool, task);
 
2023
        task = tracker_task_new (file, ctxt,
 
2024
                                 (GDestroyNotify) update_processing_task_context_free);
 
2025
        tracker_task_pool_add (priv->task_pool, task);
 
2026
        priv->extraction_tasks = g_list_prepend (priv->extraction_tasks, task);
1859
2027
 
1860
2028
        if (do_process_file (fs, task)) {
1861
2029
                fs->priv->total_files_processed++;
1862
2030
 
1863
 
                if (tracker_processing_pool_wait_limit_reached (priv->processing_pool)) {
 
2031
                if (tracker_task_pool_limit_reached (priv->task_pool)) {
1864
2032
                        retval = FALSE;
1865
2033
                }
1866
2034
        }
1878
2046
{
1879
2047
        gchar *uri;
1880
2048
        gchar *mime = NULL;
1881
 
        TrackerProcessingTask *task;
 
2049
        TrackerTask *task;
1882
2050
 
1883
2051
        iri_cache_invalidate (fs, file);
1884
2052
        uri = g_file_get_uri (file);
1906
2074
         * device). */
1907
2075
 
1908
2076
        /* Add new task to processing pool */
1909
 
        task = tracker_processing_task_new (file);
1910
 
        tracker_processing_task_set_bulk_operation (task,
1911
 
                                                    "DELETE { "
1912
 
                                                    "  ?f tracker:available true "
1913
 
                                                    "}",
1914
 
                                                    TRACKER_BULK_MATCH_EQUALS |
1915
 
                                                    TRACKER_BULK_MATCH_CHILDREN);
 
2077
        task = tracker_sparql_task_new_bulk (file,
 
2078
                                             "DELETE { "
 
2079
                                             "  ?f tracker:available true "
 
2080
                                             "}",
 
2081
                                             TRACKER_BULK_MATCH_EQUALS |
 
2082
                                             TRACKER_BULK_MATCH_CHILDREN);
1916
2083
 
1917
 
        /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
1918
 
         * and in this case we need to setup queue handlers again */
1919
 
        if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
1920
 
                                                      task,
1921
 
                                                      processing_pool_task_finished_cb,
1922
 
                                                      fs)) {
1923
 
                item_queue_handlers_set_up (fs);
1924
 
        }
 
2084
        tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 
2085
                                    task,
 
2086
                                    G_PRIORITY_DEFAULT,
 
2087
                                    sparql_buffer_task_finished_cb,
 
2088
                                    fs);
1925
2089
 
1926
2090
        /* SECOND:
1927
2091
         * Actually remove all resources. This operation is the one which may take
1929
2093
         */
1930
2094
 
1931
2095
        /* Add new task to processing pool */
1932
 
        task = tracker_processing_task_new (file);
1933
 
        tracker_processing_task_set_bulk_operation (task,
1934
 
                                                    "DELETE { "
1935
 
                                                    "  ?f a rdfs:Resource "
1936
 
                                                    "}",
1937
 
                                                    TRACKER_BULK_MATCH_EQUALS |
1938
 
                                                    TRACKER_BULK_MATCH_CHILDREN);
1939
 
 
1940
 
        /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
1941
 
         * and in this case we need to setup queue handlers again */
1942
 
        if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
1943
 
                                                      task,
1944
 
                                                      processing_pool_task_finished_cb,
1945
 
                                                      fs)) {
 
2096
        task = tracker_sparql_task_new_bulk (file,
 
2097
                                             "DELETE { "
 
2098
                                             "  ?f a rdfs:Resource "
 
2099
                                             "}",
 
2100
                                             TRACKER_BULK_MATCH_EQUALS |
 
2101
                                             TRACKER_BULK_MATCH_CHILDREN);
 
2102
 
 
2103
        tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 
2104
                                    task,
 
2105
                                    G_PRIORITY_DEFAULT,
 
2106
                                    sparql_buffer_task_finished_cb,
 
2107
                                    fs);
 
2108
 
 
2109
        if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
1946
2110
                item_queue_handlers_set_up (fs);
1947
2111
        }
1948
2112
 
2135
2299
        GFileInfo *file_info;
2136
2300
        GString   *sparql;
2137
2301
        RecursiveMoveData move_data;
2138
 
        TrackerProcessingTask *task;
 
2302
        TrackerTask *task;
2139
2303
        gchar *source_iri;
2140
2304
        gchar *display_name;
2141
2305
        gboolean source_exists;
2185
2349
                if (file_type == G_FILE_TYPE_DIRECTORY &&
2186
2350
                    should_recurse_for_directory (fs, file)) {
2187
2351
                        /* We're dealing with a recursive directory */
2188
 
                        tracker_miner_fs_directory_add_internal (fs, file);
 
2352
                        tracker_miner_fs_directory_add_internal (fs, file,
 
2353
                                                                 G_PRIORITY_DEFAULT);
2189
2354
                        retval = TRUE;
2190
2355
                } else {
2191
 
                        retval = item_add_or_update (fs, file);
 
2356
                        retval = item_add_or_update (fs, file, G_PRIORITY_DEFAULT);
2192
2357
                }
2193
2358
 
2194
2359
                g_free (source_uri);
2280
2445
        g_main_loop_unref (move_data.main_loop);
2281
2446
 
2282
2447
        /* Add new task to processing pool */
2283
 
        task = tracker_processing_task_new (file);
2284
 
        /* Note that set_sparql_string() takes ownership of the passed string */
2285
 
        tracker_processing_task_set_sparql_string (task,
2286
 
                                                   g_string_free (sparql, FALSE));
2287
 
        /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
2288
 
         * and in this case we need to setup queue handlers again */
2289
 
        if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
2290
 
                                                      task,
2291
 
                                                      processing_pool_task_finished_cb,
2292
 
                                                      fs)) {
 
2448
        task = tracker_sparql_task_new_take_sparql_str (file,
 
2449
                                                        g_string_free (sparql,
 
2450
                                                                       FALSE));
 
2451
        tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 
2452
                                    task,
 
2453
                                    G_PRIORITY_DEFAULT,
 
2454
                                    sparql_buffer_task_finished_cb,
 
2455
                                    fs);
 
2456
 
 
2457
        if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
2293
2458
                item_queue_handlers_set_up (fs);
2294
2459
        }
2295
2460
 
2318
2483
{
2319
2484
        CrawledDirectoryData *dir_data;
2320
2485
        GList *l, *post_nodes = NULL;
 
2486
        gint priority;
2321
2487
        GFile *file;
2322
2488
        GNode *node;
2323
2489
 
2324
 
        dir_data = g_queue_peek_head (fs->priv->crawled_directories);
 
2490
        dir_data = tracker_priority_queue_peek (fs->priv->crawled_directories,
 
2491
                                                &priority);
2325
2492
 
2326
2493
        if (g_queue_is_empty (dir_data->nodes)) {
2327
2494
                /* Special case, append the root directory for the tree */
2333
2500
 
2334
2501
                if (!g_object_get_qdata (G_OBJECT (file), fs->priv->quark_ignore_file)) {
2335
2502
                        trace_eq_push_tail ("CREATED", file, "Root directory of tree");
2336
 
                        g_queue_push_tail (fs->priv->items_created, g_object_ref (file));
 
2503
 
 
2504
                        tracker_priority_queue_add (fs->priv->items_created,
 
2505
                                                    g_object_ref (file),
 
2506
                                                    priority);
2337
2507
                        return;
2338
2508
                }
2339
2509
        }
2363
2533
 
2364
2534
                        if (!g_object_get_qdata (G_OBJECT (file), fs->priv->quark_ignore_file)) {
2365
2535
                                trace_eq_push_tail ("CREATED", file, NULL);
2366
 
                                g_queue_push_tail (fs->priv->items_created, g_object_ref (file));
 
2536
 
 
2537
                                tracker_priority_queue_add (fs->priv->items_created,
 
2538
                                                            g_object_ref (file),
 
2539
                                                            priority);
2367
2540
                        }
2368
2541
 
2369
2542
                        if (children->children) {
2387
2560
 
2388
2561
        if (g_queue_is_empty (dir_data->nodes)) {
2389
2562
                /* There's no more data to process, move on to the next one */
2390
 
                g_queue_pop_head (fs->priv->crawled_directories);
 
2563
                tracker_priority_queue_pop (fs->priv->crawled_directories,
 
2564
                                            NULL);
2391
2565
                crawled_directory_data_free (dir_data);
2392
2566
        }
2393
2567
}
2400
2574
        GFile *parent;
2401
2575
 
2402
2576
        /* Is the item already being processed? */
2403
 
        if (tracker_processing_pool_find_task (fs->priv->processing_pool,
2404
 
                                               file,
2405
 
                                               TRUE)) {
 
2577
        if (tracker_task_pool_find (fs->priv->task_pool, file) ||
 
2578
            tracker_task_pool_find (fs->priv->writeback_pool, file) ||
 
2579
            tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
2406
2580
                /* Yes, a previous event on same item currently
2407
2581
                 * being processed */
2408
2582
                return TRUE;
2411
2585
        /* Is the item's parent being processed right now? */
2412
2586
        parent = g_file_get_parent (file);
2413
2587
        if (parent) {
2414
 
                if (tracker_processing_pool_find_task (fs->priv->processing_pool,
2415
 
                                                       parent,
2416
 
                                                       TRUE)) {
 
2588
                if (tracker_task_pool_find (fs->priv->task_pool, parent) ||
 
2589
                    tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
2417
2590
                        /* Yes, a previous event on the parent of this item
2418
2591
                         * currently being processed */
2419
2592
                        g_object_unref (parent);
2428
2601
static QueueState
2429
2602
item_queue_get_next_file (TrackerMinerFS  *fs,
2430
2603
                          GFile          **file,
2431
 
                          GFile          **source_file)
 
2604
                          GFile          **source_file,
 
2605
                          gint            *priority_out)
2432
2606
{
2433
2607
        ItemMovedData *data;
 
2608
        ItemWritebackData *wdata;
2434
2609
        GFile *queue_file;
2435
 
 
2436
 
        /* Deleted items first */
2437
 
        queue_file = g_queue_pop_head (fs->priv->items_deleted);
 
2610
        gint priority;
 
2611
 
 
2612
        /* Writeback items first */
 
2613
        wdata = tracker_priority_queue_pop (fs->priv->items_writeback,
 
2614
                                            &priority);
 
2615
        if (wdata) {
 
2616
                gboolean processing;
 
2617
 
 
2618
                *file = g_object_ref (wdata->file);
 
2619
                *source_file = NULL;
 
2620
                *priority_out = priority;
 
2621
 
 
2622
                trace_eq_pop_head ("WRITEBACK", wdata->file);
 
2623
 
 
2624
                g_signal_emit (fs, signals[WRITEBACK_FILE], 0,
 
2625
                               wdata->file,
 
2626
                               wdata->rdf_types,
 
2627
                               wdata->results,
 
2628
                               wdata->cancellable,
 
2629
                               &processing);
 
2630
 
 
2631
                if (processing) {
 
2632
                        TrackerTask *task;
 
2633
 
 
2634
                        task = tracker_task_new (wdata->file, wdata,
 
2635
                                                 (GDestroyNotify) item_writeback_data_free);
 
2636
                        tracker_task_pool_add (fs->priv->writeback_pool, task);
 
2637
 
 
2638
                        return QUEUE_WRITEBACK;
 
2639
                } else {
 
2640
                        item_writeback_data_free (wdata);
 
2641
                }
 
2642
        }
 
2643
 
 
2644
        /* Deleted items second */
 
2645
        queue_file = tracker_priority_queue_pop (fs->priv->items_deleted,
 
2646
                                                 &priority);
2438
2647
        if (queue_file) {
2439
2648
                *source_file = NULL;
2440
2649
 
2453
2662
                        trace_eq_push_head ("DELETED", queue_file, "Should wait");
2454
2663
 
2455
2664
                        /* Need to postpone event... */
2456
 
                        g_queue_push_head (fs->priv->items_deleted,
2457
 
                                           queue_file);
 
2665
                        tracker_priority_queue_add (fs->priv->items_deleted,
 
2666
                                                    queue_file, priority);
2458
2667
                        return QUEUE_WAIT;
2459
2668
                }
2460
2669
 
2461
2670
                *file = queue_file;
 
2671
                *priority_out = priority;
2462
2672
                return QUEUE_DELETED;
2463
2673
        }
2464
2674
 
2465
 
        if (g_queue_is_empty (fs->priv->items_created) &&
2466
 
            !g_queue_is_empty (fs->priv->crawled_directories)) {
 
2675
        if (tracker_priority_queue_is_empty (fs->priv->items_created) &&
 
2676
            !tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
2467
2677
 
2468
2678
                trace_eq ("Created items queue empty, but still crawling (%d tasks in WAIT state)",
2469
 
                          tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool));
 
2679
                          tracker_task_pool_get_size (fs->priv->task_pool));
2470
2680
 
2471
2681
                /* The items_created queue is empty, but there are pending
2472
2682
                 * items from the crawler to be processed. We feed the queue
2474
2684
                 * info is inserted to the store before the children are
2475
2685
                 * inspected.
2476
2686
                 */
2477
 
                if (tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool) > 0) {
 
2687
                if (tracker_task_pool_get_size (fs->priv->task_pool) > 0) {
2478
2688
                        /* Items still being processed */
2479
2689
                        *file = NULL;
2480
2690
                        *source_file = NULL;
2484
2694
                         * one of these return something for the miner to do,
2485
2695
                         * or no data is left to process.
2486
2696
                         */
2487
 
                        while (g_queue_is_empty (fs->priv->items_created) &&
2488
 
                               !g_queue_is_empty (fs->priv->crawled_directories)) {
 
2697
                        while (tracker_priority_queue_is_empty (fs->priv->items_created) &&
 
2698
                               !tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
2489
2699
                                fill_in_items_created_queue (fs);
2490
2700
                        }
2491
2701
                }
2492
2702
        }
2493
2703
 
2494
2704
        /* Created items next */
2495
 
        queue_file = g_queue_pop_head (fs->priv->items_created);
 
2705
        queue_file = tracker_priority_queue_pop (fs->priv->items_created,
 
2706
                                                 &priority);
2496
2707
        if (queue_file) {
2497
2708
                *source_file = NULL;
2498
2709
 
2532
2743
                        trace_eq_push_head ("CREATED", queue_file, "Should wait");
2533
2744
 
2534
2745
                        /* Need to postpone event... */
2535
 
                        g_queue_push_head (fs->priv->items_created,
2536
 
                                           queue_file);
 
2746
                        tracker_priority_queue_add (fs->priv->items_created,
 
2747
                                                    queue_file, priority);
2537
2748
                        return QUEUE_WAIT;
2538
2749
                }
2539
2750
 
2540
2751
                *file = queue_file;
 
2752
                *priority_out = priority;
2541
2753
                return QUEUE_CREATED;
2542
2754
        }
2543
2755
 
2544
2756
        /* Updated items next */
2545
 
        queue_file = g_queue_pop_head (fs->priv->items_updated);
 
2757
        queue_file = tracker_priority_queue_pop (fs->priv->items_updated,
 
2758
                                                 &priority);
2546
2759
        if (queue_file) {
2547
2760
                *file = queue_file;
2548
2761
                *source_file = NULL;
2569
2782
                        trace_eq_push_head ("UPDATED", queue_file, "Should wait");
2570
2783
 
2571
2784
                        /* Need to postpone event... */
2572
 
                        g_queue_push_head (fs->priv->items_updated,
2573
 
                                           queue_file);
 
2785
                        tracker_priority_queue_add (fs->priv->items_updated,
 
2786
                                                    queue_file, priority);
2574
2787
                        return QUEUE_WAIT;
2575
2788
                }
2576
2789
 
 
2790
                *priority_out = priority;
 
2791
 
2577
2792
                return QUEUE_UPDATED;
2578
2793
        }
2579
2794
 
2580
2795
        /* Moved items next */
2581
 
        data = g_queue_pop_head (fs->priv->items_moved);
 
2796
        data = tracker_priority_queue_pop (fs->priv->items_moved,
 
2797
                                           &priority);
2582
2798
        if (data) {
2583
2799
                trace_eq_pop_head_2 ("MOVED", data->file, data->source_file);
2584
2800
 
2610
2826
                        trace_eq_push_head_2 ("MOVED", data->source_file, data->file, "Should wait");
2611
2827
 
2612
2828
                        /* Need to postpone event... */
2613
 
                        g_queue_push_head (fs->priv->items_moved,
2614
 
                                           data); /* no need to create again */
 
2829
                        tracker_priority_queue_add (fs->priv->items_moved,
 
2830
                                                    data, priority);
2615
2831
                        return QUEUE_WAIT;
2616
2832
                }
2617
2833
 
2618
2834
                *file = g_object_ref (data->file);
2619
2835
                *source_file = g_object_ref (data->source_file);
 
2836
                *priority_out = priority;
2620
2837
                item_moved_data_free (data);
2621
2838
                return QUEUE_MOVED;
2622
2839
        }
2624
2841
        *file = NULL;
2625
2842
        *source_file = NULL;
2626
2843
 
 
2844
        if (fs->priv->is_crawling ||
 
2845
            fs->priv->crawl_directories_id != 0 ||
 
2846
            !tracker_priority_queue_is_empty (fs->priv->crawled_directories) ||
 
2847
            !tracker_priority_queue_is_empty (fs->priv->directories) ||
 
2848
            tracker_task_pool_limit_reached (fs->priv->task_pool) ||
 
2849
            tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
 
2850
                /* There are still pending items to crawl,
 
2851
                 * or extract pool limit is reached
 
2852
                 */
 
2853
                return QUEUE_WAIT;
 
2854
        }
 
2855
 
2627
2856
        return QUEUE_NONE;
2628
2857
}
2629
2858
 
2642
2871
        guint items_to_process = 0;
2643
2872
        guint items_total = 0;
2644
2873
 
2645
 
        items_to_process += g_queue_get_length (fs->priv->items_deleted);
2646
 
        items_to_process += g_queue_get_length (fs->priv->items_created);
2647
 
        items_to_process += g_queue_get_length (fs->priv->items_updated);
2648
 
        items_to_process += g_queue_get_length (fs->priv->items_moved);
 
2874
        items_to_process += tracker_priority_queue_get_length (fs->priv->items_deleted);
 
2875
        items_to_process += tracker_priority_queue_get_length (fs->priv->items_created);
 
2876
        items_to_process += tracker_priority_queue_get_length (fs->priv->items_updated);
 
2877
        items_to_process += tracker_priority_queue_get_length (fs->priv->items_moved);
 
2878
        items_to_process += tracker_priority_queue_get_length (fs->priv->items_writeback);
2649
2879
 
2650
 
        g_queue_foreach (fs->priv->crawled_directories,
2651
 
                         (GFunc) get_tree_progress_foreach,
2652
 
                         &items_to_process);
 
2880
        tracker_priority_queue_foreach (fs->priv->crawled_directories,
 
2881
                                        (GFunc) get_tree_progress_foreach,
 
2882
                                        &items_to_process);
2653
2883
 
2654
2884
        items_total += fs->priv->total_directories_found;
2655
2885
        items_total += fs->priv->total_files_found;
2680
2910
        TrackerMinerFS *fs;
2681
2911
        GFile *file = NULL;
2682
2912
        GFile *source_file = NULL;
 
2913
        GFile *parent;
2683
2914
        QueueState queue;
2684
2915
        GTimeVal time_now;
2685
2916
        static GTimeVal time_last = { 0 };
2686
2917
        gboolean keep_processing = TRUE;
 
2918
        gint priority = 0;
2687
2919
 
2688
2920
        fs = user_data;
2689
 
        queue = item_queue_get_next_file (fs, &file, &source_file);
 
2921
        queue = item_queue_get_next_file (fs, &file, &source_file, &priority);
2690
2922
 
2691
2923
        if (queue == QUEUE_WAIT) {
2692
2924
                /* Items are still being processed, and there is pending
2700
2932
                 * if there was a previous task on the same file we want to
2701
2933
                 * process now, we want it to get finished before we can go
2702
2934
                 * on with the queues... */
2703
 
                tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
2704
 
                                                      "Queue handlers WAIT");
 
2935
                tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
 
2936
                                             "Queue handlers WAIT");
2705
2937
 
2706
2938
                return FALSE;
2707
2939
        }
2800
3032
        /* Handle queues */
2801
3033
        switch (queue) {
2802
3034
        case QUEUE_NONE:
2803
 
                /* Print stats and signal finished */
2804
3035
                if (!fs->priv->is_crawling &&
2805
 
                    tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
2806
 
                        process_stop (fs);
 
3036
                    tracker_task_pool_get_size (fs->priv->task_pool) == 0) {
 
3037
                        if (tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
 
3038
                                /* Print stats and signal finished */
 
3039
                                process_stop (fs);
 
3040
 
 
3041
                                tracker_thumbnailer_send ();
 
3042
                                tracker_albumart_check_cleanup (tracker_miner_get_connection (TRACKER_MINER (fs)));
 
3043
                        } else {
 
3044
                                /* Flush any possible pending update here */
 
3045
                                tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
 
3046
                                                             "Queue handlers NONE");
 
3047
                        }
2807
3048
                }
2808
3049
 
2809
 
                /* Flush any possible pending update here */
2810
 
                tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
2811
 
                                                      "Queue handlers NONE");
2812
 
 
2813
 
                tracker_thumbnailer_send ();
2814
 
                tracker_albumart_check_cleanup (tracker_miner_get_connection (TRACKER_MINER (fs)));
2815
3050
                /* No more files left to process */
2816
3051
                keep_processing = FALSE;
2817
3052
                break;
2822
3057
                keep_processing = item_remove (fs, file);
2823
3058
                break;
2824
3059
        case QUEUE_CREATED:
2825
 
                /* Check existence before processing, if requested to do so. */
 
3060
                /* If the item is a directory which was found during crawling, we need
 
3061
                 * to check existence before processing */
2826
3062
                if (g_object_get_qdata (G_OBJECT (file),
2827
 
                                        fs->priv->quark_check_existence)) {
2828
 
                        /* Clear the qdata */
2829
 
                        g_object_set_qdata (G_OBJECT (file),
2830
 
                                            fs->priv->quark_check_existence,
2831
 
                                            GINT_TO_POINTER (FALSE));
2832
 
 
2833
 
                        /* Avoid adding items that already exist, when processing
2834
 
                         * a CREATED task (as those generated when crawling) */
2835
 
                        if (!item_query_exists (fs, file, FALSE, NULL, NULL)) {
2836
 
                                keep_processing = item_add_or_update (fs, file);
2837
 
                                break;
2838
 
                        }
2839
 
 
 
3063
                                        fs->priv->quark_directory_found_crawling) &&
 
3064
                    item_query_exists (fs, file, FALSE, NULL, NULL)) {
2840
3065
                        /* If already in store, skip processing the CREATED task */
2841
3066
                        keep_processing = TRUE;
2842
3067
                        break;
2843
3068
                }
2844
3069
                /* Else, fall down and treat as QUEUE_UPDATED */
2845
3070
        case QUEUE_UPDATED:
2846
 
                keep_processing = item_add_or_update (fs, file);
 
3071
                parent = g_file_get_parent (file);
 
3072
                iri_cache_check_update (fs, file);
 
3073
 
 
3074
                if (!parent ||
 
3075
                    fs->priv->current_iri_cache_parent_urn ||
 
3076
                    file_is_crawl_directory (fs, file)) {
 
3077
                        keep_processing = item_add_or_update (fs, file, priority);
 
3078
                } else {
 
3079
                        TrackerPriorityQueue *item_queue;
 
3080
                        gchar *uri;
 
3081
 
 
3082
                        uri = g_file_get_uri (parent);
 
3083
                        g_message ("Parent '%s' not indexed yet", uri);
 
3084
                        g_free (uri);
 
3085
 
 
3086
                        if (queue == QUEUE_CREATED) {
 
3087
                                item_queue = fs->priv->items_created;
 
3088
                        } else {
 
3089
                                item_queue = fs->priv->items_updated;
 
3090
                        }
 
3091
 
 
3092
                        /* Parent isn't indexed yet, reinsert the task into the queue,
 
3093
                         * but forcily prepended by its parent so its indexing is
 
3094
                         * ensured, tasks are inserted at a higher priority so they
 
3095
                         * are processed promptly anyway.
 
3096
                         */
 
3097
                        tracker_priority_queue_add (item_queue,
 
3098
                                                    g_object_ref (parent),
 
3099
                                                    priority - 1);
 
3100
 
 
3101
                        tracker_priority_queue_add (item_queue,
 
3102
                                                    g_object_ref (file),
 
3103
                                                    priority);
 
3104
                        keep_processing = TRUE;
 
3105
                }
 
3106
 
 
3107
                if (parent) {
 
3108
                        g_object_unref (parent);
 
3109
                }
 
3110
 
2847
3111
                break;
2848
3112
        case QUEUE_IGNORE_NEXT_UPDATE:
2849
3113
                keep_processing = item_ignore_next_update (fs, file, source_file);
2850
3114
                break;
 
3115
        case QUEUE_WRITEBACK:
 
3116
                /* Nothing to do here */
 
3117
                keep_processing = TRUE;
 
3118
                break;
2851
3119
        default:
2852
3120
                g_assert_not_reached ();
2853
3121
        }
2895
3163
                return;
2896
3164
        }
2897
3165
 
2898
 
        /* Already sent max number of tasks to tracker-extract? */
2899
 
        if (tracker_processing_pool_wait_limit_reached (fs->priv->processing_pool)) {
 
3166
        /* Already sent max number of tasks to tracker-extract/writeback? */
 
3167
        if (tracker_task_pool_limit_reached (fs->priv->task_pool) ||
 
3168
            tracker_task_pool_limit_reached (fs->priv->writeback_pool)) {
2900
3169
                return;
2901
3170
        }
2902
3171
 
2903
 
        /* Already sent max number of requests to tracker-store?
2904
 
         * In this case, we also slow down the processing of items, as we don't
2905
 
         * want to keep on extracting if the communication with tracker-store is
2906
 
         * very busy. Note that this is not very likely to happen, as the bottleneck
2907
 
         * during extraction is not the communication with tracker-store.
2908
 
         */
2909
 
        if (tracker_processing_pool_n_requests_limit_reached (fs->priv->processing_pool)) {
 
3172
        if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
2910
3173
                return;
2911
3174
        }
2912
3175
 
2951
3214
                g_free (uri);
2952
3215
 
2953
3216
                trace_eq_push_tail ("DELETED", file, "No longer exists");
2954
 
                g_queue_push_tail (fs->priv->items_deleted,
2955
 
                                   g_object_ref (file));
 
3217
                tracker_priority_queue_add (fs->priv->items_deleted,
 
3218
                                            g_object_ref (file),
 
3219
                                            G_PRIORITY_LOW);
2956
3220
 
2957
3221
                item_queue_handlers_set_up (fs);
2958
3222
        }
2992
3256
        gchar *query, *uri;
2993
3257
        GFile *parent;
2994
3258
        guint cache_size;
 
3259
        GFile *parent_in_queue;
2995
3260
 
2996
3261
        if (G_UNLIKELY (!fs->priv->mtime_cache)) {
2997
3262
                fs->priv->mtime_cache = g_hash_table_new_full (g_file_hash,
3031
3296
         * Before querying the store, check if the parent directory is scheduled to
3032
3297
         * be added, and if so, leave the mtime cache empty.
3033
3298
         */
3034
 
        if (g_queue_find_custom (fs->priv->items_created,
3035
 
                                 parent,
3036
 
                                 (GCompareFunc)tracker_file_cmp) != NULL) {
 
3299
        parent_in_queue = tracker_priority_queue_find (fs->priv->items_created,
 
3300
                                                       NULL,
 
3301
                                                       (GEqualFunc) g_file_equal,
 
3302
                                                       parent);
 
3303
        if (parent_in_queue &&
 
3304
            !g_object_get_qdata (G_OBJECT (parent_in_queue),
 
3305
                                 fs->priv->quark_directory_found_crawling)) {
3037
3306
                uri = g_file_get_uri (file);
3038
3307
                g_debug ("Empty mtime cache for URI '%s' "
3039
3308
                         "(parent scheduled to be created)",
3255
3524
                         * in the store, put in deleted queue.
3256
3525
                         */
3257
3526
                        trace_eq_push_tail ("DELETED", file, "No longer to be indexed");
3258
 
                        g_queue_push_tail (fs->priv->items_deleted,
3259
 
                                           g_object_ref (file));
 
3527
                        tracker_priority_queue_add (fs->priv->items_deleted,
 
3528
                                                    g_object_ref (file),
 
3529
                                                    G_PRIORITY_DEFAULT);
3260
3530
                }
3261
3531
                return FALSE;
3262
3532
        }
3265
3535
        return should_change_index_for_file (fs, file);
3266
3536
}
3267
3537
 
3268
 
static gint
3269
 
compare_files (gconstpointer a,
3270
 
               gconstpointer b)
3271
 
{
3272
 
        if (g_file_equal (G_FILE (a), G_FILE (b))) {
3273
 
                return 0;
3274
 
        }
3275
 
 
3276
 
        return 1;
3277
 
}
3278
 
 
3279
 
static gint
3280
 
compare_moved_files (gconstpointer a,
3281
 
                     gconstpointer b)
 
3538
static gboolean
 
3539
moved_files_equal (gconstpointer a,
 
3540
                   gconstpointer b)
3282
3541
{
3283
3542
        const ItemMovedData *data = a;
3284
3543
        GFile *file = G_FILE (b);
3285
3544
 
3286
3545
        /* Compare with dest file */
3287
 
        if (g_file_equal (data->file, file)) {
3288
 
                return 0;
3289
 
        }
3290
 
 
3291
 
        return 1;
3292
 
}
3293
 
 
3294
 
/* Checks previous created/updated/deleted/moved queues for
 
3546
        return g_file_equal (data->file, file);
 
3547
}
 
3548
 
 
3549
static gboolean
 
3550
writeback_files_equal (gconstpointer a,
 
3551
                       gconstpointer b)
 
3552
{
 
3553
        const ItemWritebackData *data = a;
 
3554
        GFile *file = G_FILE (b);
 
3555
 
 
3556
        /* Compare with dest file */
 
3557
        return g_file_equal (data->file, file);
 
3558
}
 
3559
 
 
3560
static gboolean
 
3561
remove_writeback_task (TrackerMinerFS *fs,
 
3562
                       GFile          *file)
 
3563
{
 
3564
        TrackerTask *task;
 
3565
        ItemWritebackData *data;
 
3566
 
 
3567
        task = tracker_task_pool_find (fs->priv->writeback_pool, file);
 
3568
 
 
3569
        if (!task) {
 
3570
                return FALSE;
 
3571
        }
 
3572
 
 
3573
        data = tracker_task_get_data (task);
 
3574
 
 
3575
        if (data->notified) {
 
3576
                tracker_task_pool_remove (fs->priv->writeback_pool, task);
 
3577
                tracker_task_unref (task);
 
3578
                return TRUE;
 
3579
        }
 
3580
 
 
3581
        return FALSE;
 
3582
}
 
3583
 
 
3584
static void
 
3585
cancel_writeback_task (TrackerMinerFS *fs,
 
3586
                       GFile          *file)
 
3587
{
 
3588
        TrackerTask *task;
 
3589
 
 
3590
        task = tracker_task_pool_find (fs->priv->writeback_pool, file);
 
3591
 
 
3592
        if (task) {
 
3593
                ItemWritebackData *data;
 
3594
 
 
3595
                data = tracker_task_get_data (task);
 
3596
                g_cancellable_cancel (data->cancellable);
 
3597
                tracker_task_pool_remove (fs->priv->writeback_pool, task);
 
3598
                tracker_task_unref (task);
 
3599
        }
 
3600
}
 
3601
 
 
3602
/* Checks previous created/updated/deleted/moved/writeback queues for
3295
3603
 * monitor events. Returns TRUE if the item should still
3296
3604
 * be added to the queue.
3297
3605
 */
3298
3606
static gboolean
3299
3607
check_item_queues (TrackerMinerFS *fs,
3300
 
                   QueueState      queue,
3301
 
                   GFile          *file,
3302
 
                   GFile          *other_file)
 
3608
                   QueueState      queue,
 
3609
                   GFile          *file,
 
3610
                   GFile          *other_file)
3303
3611
{
3304
 
        GList *elem;
 
3612
        ItemMovedData *move_data;
3305
3613
 
3306
3614
        if (!fs->priv->been_crawled) {
3307
3615
                /* Only do this after initial crawling, so
3311
3619
                return TRUE;
3312
3620
        }
3313
3621
 
 
3622
        if (queue == QUEUE_UPDATED) {
 
3623
                TrackerTask *task;
 
3624
 
 
3625
                if (other_file) {
 
3626
                        task = tracker_task_pool_find (fs->priv->writeback_pool, other_file);
 
3627
                } else {
 
3628
                        task = tracker_task_pool_find (fs->priv->writeback_pool, file);
 
3629
                }
 
3630
 
 
3631
                if (task) {
 
3632
                        /* There is a writeback task for
 
3633
                         * this file, so avoid any updates
 
3634
                         */
 
3635
                        return FALSE;
 
3636
                }
 
3637
        }
 
3638
 
3314
3639
        switch (queue) {
3315
3640
        case QUEUE_CREATED:
3316
3641
                /* Created items aren't likely to have
3320
3645
                return TRUE;
3321
3646
        case QUEUE_UPDATED:
3322
3647
                /* No further updates after a previous created/updated event */
3323
 
                if (g_queue_find_custom (fs->priv->items_created, file, compare_files) ||
3324
 
                    g_queue_find_custom (fs->priv->items_updated, file, compare_files)) {
 
3648
                if (tracker_priority_queue_find (fs->priv->items_created, NULL,
 
3649
                                                 (GEqualFunc) g_file_equal, file) ||
 
3650
                    tracker_priority_queue_find (fs->priv->items_updated, NULL,
 
3651
                                                 (GEqualFunc) g_file_equal, file)) {
3325
3652
                        g_debug ("  Found previous unhandled CREATED/UPDATED event");
3326
3653
                        return FALSE;
3327
3654
                }
 
3655
        case QUEUE_WRITEBACK:
 
3656
                /* No consecutive writebacks for the same file */
 
3657
                if (tracker_priority_queue_find (fs->priv->items_writeback, NULL,
 
3658
                                                 writeback_files_equal, file)) {
 
3659
                        g_debug ("  Found previous unhandled WRITEBACK event");
 
3660
                        return FALSE;
 
3661
                }
3328
3662
 
3329
3663
                return TRUE;
3330
3664
        case QUEUE_DELETED:
3331
 
                elem = g_queue_find_custom (fs->priv->items_updated, file, compare_files);
 
3665
                if (tracker_task_pool_find (fs->priv->writeback_pool, file)) {
 
3666
                        /* Cancel writeback operations on a deleted file */
 
3667
                        cancel_writeback_task (fs, file);
 
3668
                }
3332
3669
 
3333
 
                if (elem) {
3334
 
                        /* Remove all previous updates */
 
3670
                /* Remove all previous updates */
 
3671
                if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
 
3672
                                                           (GEqualFunc) g_file_equal,
 
3673
                                                           file,
 
3674
                                                           (GDestroyNotify) g_object_unref)) {
3335
3675
                        g_debug ("  Deleting previous unhandled UPDATED event");
3336
 
                        g_object_unref (elem->data);
3337
 
                        g_queue_delete_link (fs->priv->items_updated, elem);
3338
3676
                }
3339
3677
 
3340
 
                elem = g_queue_find_custom (fs->priv->items_created, file, compare_files);
3341
 
 
3342
 
                if (elem) {
3343
 
                        /* Created event still in the queue,
 
3678
                if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
 
3679
                                                           (GEqualFunc) g_file_equal,
 
3680
                                                           file,
 
3681
                                                           (GDestroyNotify) g_object_unref)) {
 
3682
                        /* Created event was still in the queue,
3344
3683
                         * remove it and ignore the current event
3345
3684
                         */
3346
3685
                        g_debug ("  Found matching unhandled CREATED event, removing file altogether");
3347
 
                        g_object_unref (elem->data);
3348
 
                        g_queue_delete_link (fs->priv->items_created, elem);
3349
3686
                        return FALSE;
3350
3687
                }
3351
3688
 
3352
3689
                return TRUE;
3353
3690
        case QUEUE_MOVED:
 
3691
                if (tracker_task_pool_find (fs->priv->writeback_pool, file)) {
 
3692
                        /* If the origin file is also being written back,
 
3693
                         * cancel it as this is an external operation.
 
3694
                         */
 
3695
                        cancel_writeback_task (fs, file);
 
3696
                }
 
3697
 
3354
3698
                /* Kill any events on other_file (The dest one), since it will be rewritten anyway */
3355
 
                elem = g_queue_find_custom (fs->priv->items_created, other_file, compare_files);
3356
 
 
3357
 
                if (elem) {
 
3699
                if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
 
3700
                                                           (GEqualFunc) g_file_equal,
 
3701
                                                           other_file,
 
3702
                                                           (GDestroyNotify) g_object_unref)) {
3358
3703
                        g_debug ("  Removing previous unhandled CREATED event for dest file, will be rewritten anyway");
3359
 
                        g_object_unref (elem->data);
3360
 
                        g_queue_delete_link (fs->priv->items_created, elem);
3361
3704
                }
3362
3705
 
3363
 
                elem = g_queue_find_custom (fs->priv->items_updated, other_file, compare_files);
3364
 
 
3365
 
                if (elem) {
 
3706
                if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
 
3707
                                                           (GEqualFunc) g_file_equal,
 
3708
                                                           other_file,
 
3709
                                                           (GDestroyNotify) g_object_unref)) {
3366
3710
                        g_debug ("  Removing previous unhandled UPDATED event for dest file, will be rewritten anyway");
3367
 
                        g_object_unref (elem->data);
3368
 
                        g_queue_delete_link (fs->priv->items_updated, elem);
3369
3711
                }
3370
3712
 
3371
3713
                /* Now check file (Origin one) */
3372
 
                elem = g_queue_find_custom (fs->priv->items_created, file, compare_files);
3373
 
 
3374
 
                if (elem) {
3375
 
                        /* If source file was created, replace the
3376
 
                         * GFile there, we assume all posterior updates
 
3714
                if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
 
3715
                                                           (GEqualFunc) g_file_equal,
 
3716
                                                           file,
 
3717
                                                           (GDestroyNotify) g_object_unref)) {
 
3718
                        /* If source file was created, replace it with
 
3719
                         * a create event for the destination file, and
 
3720
                         * discard this event.
 
3721
                         *
 
3722
                         * We assume all posterior updates
3377
3723
                         * have been merged together previously by this
3378
3724
                         * same function.
3379
3725
                         */
3380
3726
                        g_debug ("  Found matching unhandled CREATED event "
3381
3727
                                 "for source file, merging both events together");
3382
 
                        g_object_unref (elem->data);
3383
 
                        elem->data = g_object_ref (other_file);
 
3728
                        tracker_priority_queue_add (fs->priv->items_created,
 
3729
                                                    g_object_ref (other_file),
 
3730
                                                    G_PRIORITY_DEFAULT);
3384
3731
 
3385
3732
                        return FALSE;
3386
3733
                }
3387
3734
 
3388
 
                elem = g_queue_find_custom (fs->priv->items_moved, file, compare_moved_files);
3389
 
 
3390
 
                if (elem) {
3391
 
                        ItemMovedData *data = elem->data;
3392
 
 
 
3735
                move_data = tracker_priority_queue_find (fs->priv->items_moved, NULL,
 
3736
                                                         (GEqualFunc) moved_files_equal, file);
 
3737
                if (move_data) {
3393
3738
                        /* Origin file was the dest of a previous
3394
3739
                         * move operation, merge these together.
3395
3740
                         */
3396
3741
                        g_debug ("  Source file is the destination of a previous "
3397
3742
                                 "unhandled MOVED event, merging both events together");
3398
 
                        g_object_unref (data->file);
3399
 
                        data->file = g_object_ref (other_file);
 
3743
                        g_object_unref (move_data->file);
 
3744
                        move_data->file = g_object_ref (other_file);
3400
3745
                        return FALSE;
3401
3746
                }
3402
3747
 
3420
3765
        gchar *uri;
3421
3766
 
3422
3767
        fs = user_data;
 
3768
 
 
3769
        /* Writeback tasks would receive an updated after move,
 
3770
         * consequence of the data being written back in the
 
3771
         * copy, and its monitor events being propagated to
 
3772
         * the destination file.
 
3773
         */
 
3774
        if (remove_writeback_task (fs, file)) {
 
3775
                item_queue_handlers_set_up (fs);
 
3776
                return;
 
3777
        }
 
3778
 
3423
3779
        should_process = should_check_file (fs, file, is_directory);
3424
3780
 
3425
3781
        uri = g_file_get_uri (file);
3432
3788
        if (should_process) {
3433
3789
                if (is_directory &&
3434
3790
                    should_recurse_for_directory (fs, file)) {
3435
 
                        tracker_miner_fs_directory_add_internal (fs, file);
 
3791
                        tracker_miner_fs_directory_add_internal (fs, file,
 
3792
                                                                 G_PRIORITY_DEFAULT);
3436
3793
                } else {
3437
3794
                        trace_eq_push_tail ("CREATED", file, "On monitor event");
3438
 
                        g_queue_push_tail (fs->priv->items_created,
3439
 
                                           g_object_ref (file));
 
3795
                        tracker_priority_queue_add (fs->priv->items_created,
 
3796
                                                    g_object_ref (file),
 
3797
                                                    G_PRIORITY_DEFAULT);
3440
3798
 
3441
3799
                        item_queue_handlers_set_up (fs);
3442
3800
                }
3468
3826
        if (should_process &&
3469
3827
            check_item_queues (fs, QUEUE_UPDATED, file, NULL)) {
3470
3828
                trace_eq_push_tail ("UPDATED", file, "On monitor event");
3471
 
                g_queue_push_tail (fs->priv->items_updated,
3472
 
                                   g_object_ref (file));
 
3829
                tracker_priority_queue_add (fs->priv->items_updated,
 
3830
                                            g_object_ref (file),
 
3831
                                            G_PRIORITY_DEFAULT);
3473
3832
 
3474
3833
                item_queue_handlers_set_up (fs);
3475
3834
        }
3506
3865
                                    GINT_TO_POINTER (TRUE));
3507
3866
 
3508
3867
                trace_eq_push_tail ("UPDATED", file, "On monitor event (attributes)");
3509
 
                g_queue_push_tail (fs->priv->items_updated,
3510
 
                                   g_object_ref (file));
 
3868
                tracker_priority_queue_add (fs->priv->items_updated,
 
3869
                                            g_object_ref (file),
 
3870
                                            G_PRIORITY_DEFAULT);
3511
3871
 
3512
3872
                item_queue_handlers_set_up (fs);
3513
3873
        }
3538
3898
        if (should_process &&
3539
3899
            check_item_queues (fs, QUEUE_DELETED, file, NULL)) {
3540
3900
                trace_eq_push_tail ("DELETED", file, "On monitor event");
3541
 
                g_queue_push_tail (fs->priv->items_deleted,
3542
 
                                   g_object_ref (file));
 
3901
                tracker_priority_queue_add (fs->priv->items_deleted,
 
3902
                                            g_object_ref (file),
 
3903
                                            G_PRIORITY_DEFAULT);
3543
3904
 
3544
3905
                item_queue_handlers_set_up (fs);
3545
3906
        }
3593
3954
                                         "(move monitor event, source unknown)",
3594
3955
                                         uri);
3595
3956
                                /* If the source is not monitored, we need to crawl it. */
3596
 
                                tracker_miner_fs_directory_add_internal (fs, other_file);
 
3957
                                tracker_miner_fs_directory_add_internal (fs, other_file,
 
3958
                                                                         G_PRIORITY_DEFAULT);
3597
3959
                                g_free (uri);
3598
3960
                        }
3599
3961
                }
3633
3995
                                /* Source file was not stored, check dest file as new */
3634
3996
                                if (!is_directory ||
3635
3997
                                    !should_recurse_for_directory (fs, other_file)) {
3636
 
                                        trace_eq_push_tail ("CREATED", other_file, "On move monitor event");
3637
 
                                        g_queue_push_tail (fs->priv->items_created,
3638
 
                                                           g_object_ref (other_file));
 
3998
                                        if (check_item_queues (fs, QUEUE_CREATED, other_file, NULL)) {
 
3999
                                                trace_eq_push_tail ("CREATED", other_file, "On move monitor event");
 
4000
                                                tracker_priority_queue_add (fs->priv->items_created,
 
4001
                                                                            g_object_ref (other_file),
 
4002
                                                                            G_PRIORITY_DEFAULT);
3639
4003
 
3640
 
                                        item_queue_handlers_set_up (fs);
 
4004
                                                item_queue_handlers_set_up (fs);
 
4005
                                        }
3641
4006
                                } else {
3642
 
                                        tracker_miner_fs_directory_add_internal (fs, other_file);
 
4007
                                        tracker_miner_fs_directory_add_internal (fs, other_file,
 
4008
                                                                                 G_PRIORITY_DEFAULT);
3643
4009
                                }
3644
4010
                        }
3645
4011
                        /* Else, do nothing else */
3653
4019
                        /* Delete old file */
3654
4020
                        if (check_item_queues (fs, QUEUE_DELETED, file, NULL)) {
3655
4021
                                trace_eq_push_tail ("DELETED", file, "On move monitor event");
3656
 
                                g_queue_push_tail (fs->priv->items_deleted,
3657
 
                                                   g_object_ref (file));
 
4022
                                tracker_priority_queue_add (fs->priv->items_deleted,
 
4023
                                                            g_object_ref (file),
 
4024
                                                            G_PRIORITY_DEFAULT);
3658
4025
                                item_queue_handlers_set_up (fs);
3659
4026
                        }
3660
4027
                } else {
3668
4035
                        /* Move old file to new file */
3669
4036
                        if (check_item_queues (fs, QUEUE_MOVED, file, other_file)) {
3670
4037
                                trace_eq_push_tail_2 ("MOVED", file, other_file, "On monitor event");
3671
 
                                g_queue_push_tail (fs->priv->items_moved,
3672
 
                                                   item_moved_data_new (other_file, file));
 
4038
                                tracker_priority_queue_add (fs->priv->items_moved,
 
4039
                                                            item_moved_data_new (other_file, file),
 
4040
                                                            G_PRIORITY_DEFAULT);
3673
4041
                                item_queue_handlers_set_up (fs);
3674
4042
                        }
3675
4043
                }
3717
4085
                         * in the store, put in deleted queue.
3718
4086
                         */
3719
4087
                        trace_eq_push_tail ("DELETED", file, "while crawling directory");
3720
 
                        g_queue_push_tail (fs->priv->items_deleted,
3721
 
                                           g_object_ref (file));
 
4088
                        tracker_priority_queue_add (fs->priv->items_deleted,
 
4089
                                                    g_object_ref (file),
 
4090
                                                    G_PRIORITY_DEFAULT);
3722
4091
                }
3723
4092
        } else {
3724
4093
                gboolean should_change_index;
3789
4158
                 * -mtime_checking is TRUE.
3790
4159
                 */
3791
4160
                if (fs->priv->been_crawled || fs->priv->mtime_checking) {
3792
 
                        /* Set quark so that before trying to add the item we first
3793
 
                         * check for its existence. */
 
4161
                        /* Set quark to identify item found during crawling */
3794
4162
                        g_object_set_qdata (G_OBJECT (parent),
3795
 
                                            fs->priv->quark_check_existence,
 
4163
                                            fs->priv->quark_directory_found_crawling,
3796
4164
                                            GINT_TO_POINTER (TRUE));
3797
4165
 
3798
4166
                        /* Before adding the monitor, start notifying the store
3799
4167
                         * about the new directory, so that if any file event comes
3800
4168
                         * afterwards, the directory is already in store. */
3801
4169
                        trace_eq_push_tail ("CREATED", parent, "while crawling directory, parent");
3802
 
                        g_queue_push_tail (fs->priv->items_created,
3803
 
                                           g_object_ref (parent));
 
4170
                        tracker_priority_queue_add (fs->priv->items_created,
 
4171
                                                    g_object_ref (parent),
 
4172
                                                    G_PRIORITY_DEFAULT);
3804
4173
                        item_queue_handlers_set_up (fs);
3805
4174
 
3806
4175
                        /* As we already added here, specify that it shouldn't be added
3881
4250
        g_slice_free (CrawledDirectoryData, data);
3882
4251
}
3883
4252
 
 
4253
/* Returns TRUE if file equals to
 
4254
 * other_file, or is a child of it
 
4255
 */
 
4256
static gboolean
 
4257
file_equal_or_descendant (GFile *file,
 
4258
                          GFile *prefix)
 
4259
{
 
4260
        if (g_file_equal (file, prefix) ||
 
4261
            g_file_has_prefix (file, prefix)) {
 
4262
                return TRUE;
 
4263
        }
 
4264
 
 
4265
        return FALSE;
 
4266
}
 
4267
 
 
4268
static gboolean
 
4269
crawled_directory_contains_file (CrawledDirectoryData *data,
 
4270
                                 GFile                *file)
 
4271
{
 
4272
        return file_equal_or_descendant (file, data->tree->data);
 
4273
}
 
4274
 
3884
4275
static void
3885
4276
crawler_directory_crawled_cb (TrackerCrawler *crawler,
3886
4277
                              GFile          *directory,
3905
4296
         * further data is left there.
3906
4297
         */
3907
4298
        dir_data = crawled_directory_data_new (tree);
3908
 
        g_queue_push_tail (fs->priv->crawled_directories, dir_data);
 
4299
        tracker_priority_queue_add (fs->priv->crawled_directories,
 
4300
                                    dir_data,
 
4301
                                    G_PRIORITY_DEFAULT);
3909
4302
 
3910
4303
        /* Update stats */
3911
4304
        fs->priv->directories_found += directories_found;
3969
4362
                return FALSE;
3970
4363
        }
3971
4364
 
3972
 
        if (!fs->priv->directories) {
 
4365
        if (tracker_priority_queue_is_empty (fs->priv->directories)) {
3973
4366
                if (fs->priv->current_iri_cache_parent) {
3974
4367
                        /* Unset parent folder so caches are regenerated */
3975
4368
                        g_object_unref (fs->priv->current_iri_cache_parent);
3994
4387
                fs->priv->timer = g_timer_new ();
3995
4388
        }
3996
4389
 
3997
 
        fs->priv->current_directory = fs->priv->directories->data;
3998
 
        fs->priv->directories = g_list_remove (fs->priv->directories,
3999
 
                                                  fs->priv->current_directory);
 
4390
        fs->priv->current_directory = tracker_priority_queue_pop (fs->priv->directories,
 
4391
                                                                  NULL);
4000
4392
 
4001
4393
        path = g_file_get_path (fs->priv->current_directory->file);
4002
4394
        path_utf8 = g_filename_to_utf8 (path, -1, NULL, NULL, NULL);
4113
4505
        return recurse;
4114
4506
}
4115
4507
 
4116
 
 
4117
 
/* Returns 0 if 'a' and 'b' point to the same diretory, OR if
 
4508
static gboolean
 
4509
directory_equals_or_contains (gconstpointer a,
 
4510
                              gconstpointer b)
 
4511
{
 
4512
        DirectoryData *dda = (DirectoryData *) a;
 
4513
        DirectoryData *ddb = (DirectoryData *) b;
 
4514
 
 
4515
        return directory_contains_file (dda, ddb->file);
 
4516
}
 
4517
 
 
4518
 
 
4519
/* Returns 0 if 'a' and 'b' point to the same directory, OR if
4118
4520
 *  'b' is contained inside directory 'a' and 'a' is recursively
4119
4521
 *  indexed. */
4120
4522
static gint
4121
4523
directory_compare_cb (gconstpointer a,
4122
4524
                      gconstpointer b)
4123
4525
{
4124
 
        DirectoryData *dda = (DirectoryData *) a;
4125
 
        DirectoryData *ddb = (DirectoryData *) b;
4126
 
 
4127
 
        return (g_file_equal (dda->file, ddb->file) ||
4128
 
                (dda->recurse &&
4129
 
                 g_file_has_prefix (ddb->file, dda->file))) ? 0 : -1;
 
4526
        return directory_equals_or_contains (a, b) ? 0 : -1;
4130
4527
}
4131
4528
 
4132
4529
 
4136
4533
 */
4137
4534
static void
4138
4535
tracker_miner_fs_directory_add_internal (TrackerMinerFS *fs,
4139
 
                                         GFile          *file)
 
4536
                                         GFile          *file,
 
4537
                                         gint            priority)
4140
4538
{
4141
4539
        DirectoryData *data;
4142
4540
        gboolean recurse;
4145
4543
        data = directory_data_new (file, recurse);
4146
4544
 
4147
4545
        /* Only add if not already there */
4148
 
        if (!g_list_find_custom (fs->priv->directories,
4149
 
                                 data,
4150
 
                                 directory_compare_cb)) {
4151
 
                fs->priv->directories =
4152
 
                        g_list_append (fs->priv->directories,
4153
 
                                       directory_data_ref (data));
 
4546
        if (tracker_priority_queue_find (fs->priv->directories,
 
4547
                                         NULL,
 
4548
                                         directory_equals_or_contains,
 
4549
                                         data) == NULL) {
 
4550
                tracker_priority_queue_add (fs->priv->directories,
 
4551
                                            directory_data_ref (data),
 
4552
                                            priority);
4154
4553
 
4155
4554
                crawl_directories_start (fs);
4156
4555
        }
4190
4589
        }
4191
4590
 
4192
4591
        /* If not already in the list to process, add it */
4193
 
        if (!g_list_find_custom (fs->priv->directories,
4194
 
                                 dir_data,
4195
 
                                 directory_compare_cb)) {
4196
 
                fs->priv->directories =
4197
 
                        g_list_append (fs->priv->directories,
4198
 
                                       directory_data_ref (dir_data));
 
4592
        if (tracker_priority_queue_find (fs->priv->directories,
 
4593
                                         NULL,
 
4594
                                         directory_equals_or_contains,
 
4595
                                         dir_data) == NULL) {
 
4596
                tracker_priority_queue_add (fs->priv->directories,
 
4597
                                            directory_data_ref (dir_data),
 
4598
                                            G_PRIORITY_DEFAULT);
4199
4599
 
4200
4600
                crawl_directories_start (fs);
4201
4601
        }
4204
4604
}
4205
4605
 
4206
4606
static void
4207
 
check_files_removal (GQueue *queue,
4208
 
                     GFile  *parent)
4209
 
{
4210
 
        GList *l;
4211
 
 
4212
 
        l = queue->head;
4213
 
 
4214
 
        while (l) {
4215
 
                GFile *file = l->data;
4216
 
                GList *link = l;
4217
 
 
4218
 
                l = l->next;
4219
 
 
4220
 
                if (g_file_equal (file, parent) ||
4221
 
                    g_file_has_prefix (file, parent)) {
4222
 
                        g_queue_delete_link (queue, link);
4223
 
                        g_object_unref (file);
4224
 
                }
4225
 
        }
4226
 
}
4227
 
 
4228
 
static void
4229
 
processing_pool_cancel_foreach (gpointer data,
4230
 
                                gpointer user_data)
4231
 
{
4232
 
        TrackerProcessingTask *task = data;
 
4607
task_pool_cancel_foreach (gpointer data,
 
4608
                          gpointer user_data)
 
4609
{
 
4610
        TrackerTask *task = data;
4233
4611
        GFile *file = user_data;
4234
4612
        GFile *task_file;
4235
4613
        UpdateProcessingTaskContext *ctxt;
4236
4614
 
4237
 
        task_file = tracker_processing_task_get_file (task);
4238
 
        ctxt = tracker_processing_task_get_context (task);
 
4615
        ctxt = tracker_task_get_data (task);
 
4616
        task_file = tracker_task_get_file (task);
4239
4617
 
4240
4618
        if (ctxt &&
4241
4619
            ctxt->cancellable &&
4246
4624
        }
4247
4625
}
4248
4626
 
 
4627
static void
 
4628
writeback_pool_cancel_foreach (gpointer data,
 
4629
                               gpointer user_data)
 
4630
{
 
4631
        GFile *task_file, *file;
 
4632
        TrackerTask *task;
 
4633
 
 
4634
        task = data;
 
4635
        file = user_data;
 
4636
        task_file = tracker_task_get_file (task);
 
4637
 
 
4638
        if (!file ||
 
4639
            g_file_equal (task_file, file) ||
 
4640
            g_file_has_prefix (task_file, file)) {
 
4641
                ItemWritebackData *task_data;
 
4642
 
 
4643
                task_data = tracker_task_get_data (task);
 
4644
                g_cancellable_cancel (task_data->cancellable);
 
4645
        }
 
4646
}
 
4647
 
4249
4648
/**
4250
4649
 * tracker_miner_fs_directory_remove:
4251
4650
 * @fs: a #TrackerMinerFS
4275
4674
        g_debug ("Removing directory");
4276
4675
 
4277
4676
        /* Cancel all pending tasks on files inside the path given by file */
4278
 
        tracker_processing_pool_foreach (priv->processing_pool,
4279
 
                                         processing_pool_cancel_foreach,
4280
 
                                         file);
 
4677
        tracker_task_pool_foreach (priv->task_pool,
 
4678
                                   task_pool_cancel_foreach,
 
4679
                                   file);
4281
4680
 
4282
4681
        g_debug ("  Cancelled processing pool tasks at %f\n", g_timer_elapsed (timer, NULL));
4283
4682
 
 
4683
        tracker_task_pool_foreach (priv->writeback_pool,
 
4684
                                   writeback_pool_cancel_foreach,
 
4685
                                   file);
 
4686
        g_debug ("  Cancelled writeback pool tasks at %f\n",
 
4687
                 g_timer_elapsed (timer, NULL));
 
4688
 
4284
4689
        if (fs->priv->current_directory) {
4285
4690
                GFile *current_file;
4286
4691
 
4294
4699
                }
4295
4700
        }
4296
4701
 
4297
 
        dirs = fs->priv->directories;
4298
 
 
4299
 
        while (dirs) {
4300
 
                DirectoryData *data = dirs->data;
4301
 
                GList *link = dirs;
4302
 
 
4303
 
                dirs = dirs->next;
4304
 
 
4305
 
                if (g_file_equal (file, data->file) ||
4306
 
                    g_file_has_prefix (file, data->file)) {
4307
 
                        directory_data_unref (data);
4308
 
                        fs->priv->directories = g_list_delete_link (fs->priv->directories, link);
4309
 
                        return_val = TRUE;
4310
 
                }
4311
 
        }
4312
 
 
 
4702
        tracker_priority_queue_foreach_remove (fs->priv->directories,
 
4703
                                               (GEqualFunc) directory_contains_file,
 
4704
                                               file,
 
4705
                                               (GDestroyNotify) directory_data_unref);
4313
4706
        dirs = fs->priv->config_directories;
4314
4707
 
4315
4708
        while (dirs) {
4325
4718
                }
4326
4719
        }
4327
4720
 
4328
 
        dirs = fs->priv->crawled_directories->head;
4329
 
 
4330
 
        while (dirs) {
4331
 
                CrawledDirectoryData *data = dirs->data;
4332
 
                GList *link = dirs;
4333
 
 
4334
 
                dirs = dirs->next;
4335
 
 
4336
 
                if (g_file_equal (file, data->tree->data) ||
4337
 
                    g_file_has_prefix (file, data->tree->data)) {
4338
 
                        crawled_directory_data_free (data);
4339
 
                        g_queue_delete_link (priv->crawled_directories, link);
4340
 
                        return_val = TRUE;
4341
 
                }
4342
 
        }
 
4721
        tracker_priority_queue_foreach_remove (fs->priv->crawled_directories,
 
4722
                                               (GEqualFunc) crawled_directory_contains_file,
 
4723
                                               file,
 
4724
                                               (GDestroyNotify) crawled_directory_data_free);
4343
4725
 
4344
4726
        /* Remove anything contained in the removed directory
4345
4727
         * from all relevant processing queues.
4346
4728
         */
4347
 
        check_files_removal (priv->items_updated, file);
4348
 
        check_files_removal (priv->items_created, file);
 
4729
        tracker_priority_queue_foreach_remove (priv->items_updated,
 
4730
                                               (GEqualFunc) file_equal_or_descendant,
 
4731
                                               file,
 
4732
                                               (GDestroyNotify) g_object_unref);
 
4733
        tracker_priority_queue_foreach_remove (priv->items_created,
 
4734
                                               (GEqualFunc) file_equal_or_descendant,
 
4735
                                               file,
 
4736
                                               (GDestroyNotify) g_object_unref);
4349
4737
 
4350
4738
        g_debug ("  Removed files at %f\n", g_timer_elapsed (timer, NULL));
4351
4739
 
4384
4772
                /* And remove all info about the directory (recursively)
4385
4773
                 * from the store... */
4386
4774
                trace_eq_push_tail ("DELETED", file, "on remove full");
4387
 
                g_queue_push_tail (fs->priv->items_deleted,
4388
 
                                   g_object_ref (file));
 
4775
                tracker_priority_queue_add (fs->priv->items_deleted,
 
4776
                                            g_object_ref (file),
 
4777
                                            G_PRIORITY_DEFAULT);
4389
4778
                item_queue_handlers_set_up (fs);
4390
4779
 
4391
4780
                return TRUE;
4428
4817
 
4429
4818
        for (p = parents; p; p = p->next) {
4430
4819
                trace_eq_push_tail ("UPDATED", p->data, "checking file parents");
4431
 
                g_queue_push_tail (fs->priv->items_updated, p->data);
 
4820
                tracker_priority_queue_add (fs->priv->items_updated,
 
4821
                                            p->data,
 
4822
                                            G_PRIORITY_DEFAULT);
4432
4823
        }
4433
4824
 
4434
4825
        g_list_free (parents);
4437
4828
}
4438
4829
 
4439
4830
/**
 
4831
 * tracker_miner_fs_check_file_with_priority:
 
4832
 * @fs: a #TrackerMinerFS
 
4833
 * @file: #GFile for the file to check
 
4834
 * @priority: the priority of the check task
 
4835
 * @check_parents: whether to check parents and eligibility or not
 
4836
 *
 
4837
 * Tells the filesystem miner to check and index a file at
 
4838
 * a given priority, this file must be part of the usual
 
4839
 * crawling directories of #TrackerMinerFS. See
 
4840
 * tracker_miner_fs_directory_add().
 
4841
 *
 
4842
 * Since: 0.10
 
4843
 **/
 
4844
void
 
4845
tracker_miner_fs_check_file_with_priority (TrackerMinerFS *fs,
 
4846
                                           GFile          *file,
 
4847
                                           gint            priority,
 
4848
                                           gboolean        check_parents)
 
4849
{
 
4850
        gboolean should_process = TRUE;
 
4851
        gchar *path;
 
4852
 
 
4853
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 
4854
        g_return_if_fail (G_IS_FILE (file));
 
4855
 
 
4856
        if (check_parents) {
 
4857
                should_process = should_check_file (fs, file, FALSE);
 
4858
        }
 
4859
 
 
4860
        path = g_file_get_path (file);
 
4861
 
 
4862
        g_debug ("%s:'%s' (FILE) (requested by application)",
 
4863
                 should_process ? "Found " : "Ignored",
 
4864
                 path);
 
4865
 
 
4866
        if (should_process) {
 
4867
                if (check_parents && !check_file_parents (fs, file)) {
 
4868
                        return;
 
4869
                }
 
4870
 
 
4871
                trace_eq_push_tail ("UPDATED", file, "Requested by application");
 
4872
                tracker_priority_queue_add (fs->priv->items_updated,
 
4873
                                            g_object_ref (file),
 
4874
                                            priority);
 
4875
 
 
4876
                item_queue_handlers_set_up (fs);
 
4877
        }
 
4878
 
 
4879
        g_free (path);
 
4880
}
 
4881
 
 
4882
 
 
4883
/**
 
4884
 * tracker_miner_fs_writeback_file:
 
4885
 * @fs: a #TrackerMinerFS
 
4886
 * @file: #GFile for the file to check
 
4887
 * @rdf_types: A #GStrv with rdf types
 
4888
 * @results: A array of results from the preparation query
 
4889
 *
 
4890
 * Tells the filesystem miner to writeback a file.
 
4891
 *
 
4892
 * Since: 0.10.20
 
4893
 **/
 
4894
void
 
4895
tracker_miner_fs_writeback_file (TrackerMinerFS *fs,
 
4896
                                 GFile          *file,
 
4897
                                 GStrv           rdf_types,
 
4898
                                 GPtrArray      *results)
 
4899
{
 
4900
        gchar *path;
 
4901
        ItemWritebackData *data;
 
4902
 
 
4903
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 
4904
        g_return_if_fail (G_IS_FILE (file));
 
4905
 
 
4906
        path = g_file_get_path (file);
 
4907
 
 
4908
        g_debug ("Performing write-back:'%s' (requested by application)", path);
 
4909
 
 
4910
        trace_eq_push_tail ("WRITEBACK", file, "Requested by application");
 
4911
 
 
4912
        data = item_writeback_data_new (file, rdf_types, results);
 
4913
        tracker_priority_queue_add (fs->priv->items_writeback, data,
 
4914
                                    G_PRIORITY_DEFAULT);
 
4915
 
 
4916
        item_queue_handlers_set_up (fs);
 
4917
 
 
4918
        g_free (path);
 
4919
}
 
4920
 
 
4921
/**
 
4922
 * tracker_miner_fs_writeback_notify:
 
4923
 * @fs: a #TrackerMinerFS
 
4924
 * @file: a #GFile
 
4925
 * @error: a #GError with the error that happened during processing, or %NULL.
 
4926
 *
 
4927
 * Notifies @fs that all writing back on @file has been finished, if any error
 
4928
 * happened during file data processing, it should be passed in @error, else
 
4929
 * that parameter will contain %NULL to reflect success.
 
4930
 *
 
4931
 * Since: 0.10.20
 
4932
 **/
 
4933
void
 
4934
tracker_miner_fs_writeback_notify (TrackerMinerFS *fs,
 
4935
                                   GFile          *file,
 
4936
                                   const GError   *error)
 
4937
{
 
4938
        TrackerTask *task;
 
4939
 
 
4940
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 
4941
        g_return_if_fail (G_IS_FILE (file));
 
4942
 
 
4943
        fs->priv->total_files_notified++;
 
4944
 
 
4945
        task = tracker_task_pool_find (fs->priv->writeback_pool, file);
 
4946
 
 
4947
        if (!task) {
 
4948
                gchar *uri;
 
4949
 
 
4950
                uri = g_file_get_uri (file);
 
4951
                g_critical ("%s has notified that file '%s' has been written back, "
 
4952
                            "but that file was not in the task pool. "
 
4953
                            "This is an implementation error, please ensure that "
 
4954
                            "tracker_miner_fs_writeback_notify() is called on the same "
 
4955
                            "GFile that is passed in ::writeback-file, and that this"
 
4956
                            "signal didn't return FALSE for it",
 
4957
                            G_OBJECT_TYPE_NAME (fs), uri);
 
4958
                g_free (uri);
 
4959
        } else if (error) {
 
4960
                g_warning ("Writeback operation failed: %s", error->message);
 
4961
 
 
4962
                /* We don't expect any further monitor
 
4963
                 * events on the original file.
 
4964
                 */
 
4965
                tracker_task_pool_remove (fs->priv->writeback_pool, task);
 
4966
                tracker_task_unref (task);
 
4967
 
 
4968
                item_queue_handlers_set_up (fs);
 
4969
        } else {
 
4970
                ItemWritebackData *data;
 
4971
 
 
4972
                data = tracker_task_get_data (task);
 
4973
                data->notified = TRUE;
 
4974
        }
 
4975
 
 
4976
        /* Check monitor_item_updated_cb() for the remainder of this notify,
 
4977
         * as the last event happening on the written back file would be an
 
4978
         * UPDATED event caused by the changes on the cloned file, followed
 
4979
         * by a MOVE onto the original file, so the delayed update happens
 
4980
         * on the destination file.
 
4981
         */
 
4982
}
 
4983
 
 
4984
/**
4440
4985
 * tracker_miner_fs_check_file:
4441
4986
 * @fs: a #TrackerMinerFS
4442
4987
 * @file: #GFile for the file to check
4453
4998
                             GFile          *file,
4454
4999
                             gboolean        check_parents)
4455
5000
{
 
5001
        tracker_miner_fs_check_file_with_priority (fs, file,
 
5002
                                                   G_PRIORITY_HIGH,
 
5003
                                                   check_parents);
 
5004
}
 
5005
 
 
5006
/**
 
5007
 * tracker_miner_fs_check_directory_with_priority:
 
5008
 * @fs: a #TrackerMinerFS
 
5009
 * @file: #GFile for the directory to check
 
5010
 * @priority: the priority of the check task
 
5011
 * @check_parents: whether to check parents and eligibility or not
 
5012
 *
 
5013
 * Tells the filesystem miner to check and index a directory at
 
5014
 * a given priority, this file must be part of the usual crawling
 
5015
 * directories of #TrackerMinerFS. See tracker_miner_fs_directory_add().
 
5016
 *
 
5017
 * Since: 0.10
 
5018
 **/
 
5019
void
 
5020
tracker_miner_fs_check_directory_with_priority (TrackerMinerFS *fs,
 
5021
                                                GFile          *file,
 
5022
                                                gint            priority,
 
5023
                                                gboolean        check_parents)
 
5024
{
4456
5025
        gboolean should_process = TRUE;
4457
5026
        gchar *path;
4458
5027
 
4460
5029
        g_return_if_fail (G_IS_FILE (file));
4461
5030
 
4462
5031
        if (check_parents) {
4463
 
                should_process = should_check_file (fs, file, FALSE);
 
5032
                should_process = should_check_file (fs, file, TRUE);
4464
5033
        }
4465
5034
 
4466
5035
        path = g_file_get_path (file);
4467
5036
 
4468
 
        g_debug ("%s:'%s' (FILE) (requested by application)",
 
5037
        g_debug ("%s:'%s' (DIR) (requested by application)",
4469
5038
                 should_process ? "Found " : "Ignored",
4470
5039
                 path);
4471
5040
 
4474
5043
                        return;
4475
5044
                }
4476
5045
 
4477
 
                trace_eq_push_tail ("UPDATED", file, "Requested by application");
4478
 
                g_queue_push_tail (fs->priv->items_updated,
4479
 
                                   g_object_ref (file));
4480
 
 
4481
 
                item_queue_handlers_set_up (fs);
 
5046
                tracker_miner_fs_directory_add_internal (fs, file, priority);
4482
5047
        }
4483
5048
 
4484
5049
        g_free (path);
4501
5066
                                  GFile          *file,
4502
5067
                                  gboolean        check_parents)
4503
5068
{
4504
 
        gboolean should_process = TRUE;
4505
 
        gchar *path;
4506
 
 
4507
 
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
4508
 
        g_return_if_fail (G_IS_FILE (file));
4509
 
 
4510
 
        if (check_parents) {
4511
 
                should_process = should_check_file (fs, file, TRUE);
4512
 
        }
4513
 
 
4514
 
        path = g_file_get_path (file);
4515
 
 
4516
 
        g_debug ("%s:'%s' (DIR) (requested by application)",
4517
 
                 should_process ? "Found " : "Ignored",
4518
 
                 path);
4519
 
 
4520
 
        if (should_process) {
4521
 
                if (check_parents && !check_file_parents (fs, file)) {
4522
 
                        return;
4523
 
                }
4524
 
 
4525
 
                tracker_miner_fs_directory_add_internal (fs, file);
4526
 
        }
4527
 
 
4528
 
        g_free (path);
 
5069
        tracker_miner_fs_check_directory_with_priority (fs, file,
 
5070
                                                        G_PRIORITY_DEFAULT,
 
5071
                                                        check_parents);
4529
5072
}
4530
5073
 
4531
5074
/**
4545
5088
                              GFile          *file,
4546
5089
                              const GError   *error)
4547
5090
{
4548
 
        TrackerProcessingTask *task;
 
5091
        TrackerTask *task;
4549
5092
 
4550
5093
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
4551
5094
        g_return_if_fail (G_IS_FILE (file));
4552
5095
 
4553
5096
        fs->priv->total_files_notified++;
4554
5097
 
4555
 
        task = tracker_processing_pool_find_task (fs->priv->processing_pool,
4556
 
                                                  file,
4557
 
                                                  FALSE);
 
5098
        task = tracker_task_pool_find (fs->priv->task_pool, file);
4558
5099
 
4559
5100
        if (!task) {
4560
5101
                gchar *uri;
4653
5194
 *
4654
5195
 * Since: 0.8
4655
5196
 **/
4656
 
G_CONST_RETURN gchar *
 
5197
const gchar *
4657
5198
tracker_miner_fs_get_urn (TrackerMinerFS *fs,
4658
5199
                          GFile          *file)
4659
5200
{
4660
 
        TrackerProcessingTask *task;
 
5201
        TrackerTask *task;
4661
5202
 
4662
5203
        g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
4663
5204
        g_return_val_if_fail (G_IS_FILE (file), NULL);
4664
5205
 
4665
5206
        /* Check if found in currently processed data */
4666
 
        task = tracker_processing_pool_find_task (fs->priv->processing_pool,
4667
 
                                                  file,
4668
 
                                                  FALSE);
 
5207
        task = tracker_task_pool_find (fs->priv->task_pool, file);
4669
5208
 
4670
5209
        if (!task) {
4671
5210
                gchar *uri;
4681
5220
                UpdateProcessingTaskContext *ctxt;
4682
5221
 
4683
5222
                /* We are only storing the URN in the created/updated tasks */
4684
 
                ctxt = tracker_processing_task_get_context (task);
 
5223
                ctxt = tracker_task_get_data (task);
 
5224
 
4685
5225
                if (!ctxt) {
4686
5226
                        gchar *uri;
4687
5227
 
4745
5285
 *
4746
5286
 * Since: 0.8
4747
5287
 **/
4748
 
G_CONST_RETURN gchar *
 
5288
const gchar *
4749
5289
tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
4750
5290
                                 GFile          *file)
4751
5291
{
4752
 
        TrackerProcessingTask *task;
 
5292
        TrackerTask *task;
4753
5293
 
4754
5294
        g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
4755
5295
        g_return_val_if_fail (G_IS_FILE (file), NULL);
4756
5296
 
4757
5297
        /* Check if found in currently processed data */
4758
 
        task = tracker_processing_pool_find_task (fs->priv->processing_pool,
4759
 
                                                  file,
4760
 
                                                  FALSE);
 
5298
        task = tracker_task_pool_find (fs->priv->task_pool, file);
4761
5299
 
4762
5300
        if (!task) {
4763
5301
                gchar *uri;
4773
5311
                UpdateProcessingTaskContext *ctxt;
4774
5312
 
4775
5313
                /* We are only storing the URN in the created/updated tasks */
4776
 
                ctxt = tracker_processing_task_get_context (task);
 
5314
                ctxt = tracker_task_get_data (task);
 
5315
 
4777
5316
                if (!ctxt) {
4778
5317
                        gchar *uri;
4779
5318
 
4799
5338
 
4800
5339
        g_message ("Forcing re-check on all index directories");
4801
5340
 
4802
 
        directories = g_list_copy (fs->priv->config_directories);
4803
 
        g_list_foreach (directories, (GFunc) directory_data_ref, NULL);
 
5341
        directories = fs->priv->config_directories;
4804
5342
 
4805
 
        fs->priv->directories = g_list_concat (fs->priv->directories, directories);
 
5343
        while (directories) {
 
5344
                tracker_priority_queue_add (fs->priv->directories,
 
5345
                                            directory_data_ref (directories->data),
 
5346
                                            G_PRIORITY_LOW);
 
5347
                directories = directories->next;
 
5348
        }
4806
5349
 
4807
5350
        crawl_directories_start (fs);
4808
5351
}
4887
5430
{
4888
5431
        g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
4889
5432
 
4890
 
        if (g_queue_get_length (fs->priv->items_deleted) > 0 ||
4891
 
            g_queue_get_length (fs->priv->items_created) > 0 ||
4892
 
            g_queue_get_length (fs->priv->items_updated) > 0 ||
4893
 
            g_queue_get_length (fs->priv->items_moved) > 0) {
 
5433
        if (!tracker_priority_queue_is_empty (fs->priv->crawled_directories) ||
 
5434
            !tracker_priority_queue_is_empty (fs->priv->items_deleted) ||
 
5435
            !tracker_priority_queue_is_empty (fs->priv->items_created) ||
 
5436
            !tracker_priority_queue_is_empty (fs->priv->items_updated) ||
 
5437
            !tracker_priority_queue_is_empty (fs->priv->items_moved) ||
 
5438
            !tracker_priority_queue_is_empty (fs->priv->items_writeback)) {
4894
5439
                return TRUE;
4895
5440
        }
4896
5441
 
5025
5570
        miner_fs_trace_queue_with_files (fs, "UPDATED", fs->priv->items_updated);
5026
5571
        miner_fs_trace_queue_with_files (fs, "DELETED", fs->priv->items_deleted);
5027
5572
        miner_fs_trace_queue_with_data  (fs, "MOVED",   fs->priv->items_moved);
 
5573
        miner_fs_trace_queue_with_files (fs, "WRITEBACK", fs->priv->items_writeback);
5028
5574
 
5029
5575
        return TRUE;
5030
5576
}