1
/* $Id: hspstream_collector.c,v 1.24 2008/08/21 19:55:43 kazimird Exp $
2
* ===========================================================================
5
* National Center for Biotechnology Information
7
* This software/database is a "United States Government Work" under the
8
* terms of the United States Copyright Act. It was written as part of
9
* the author's official duties as a United States Government employee and
10
* thus cannot be copyrighted. This software/database is freely available
11
* to the public for use. The National Library of Medicine and the U.S.
12
* Government have not placed any restriction on its use or reproduction.
14
* Although all reasonable efforts have been taken to ensure the accuracy
15
* and reliability of the software and data, the NLM and the U.S.
16
* Government do not and cannot warrant the performance or results that
17
* may be obtained by using this software or data. The NLM and the U.S.
18
* Government disclaim all warranties, express or implied, including
19
* warranties of performance, merchantability or fitness for any particular
22
* Please cite the author in any work or product based on this material.
24
* ===========================================================================
26
* Author: Ilya Dondoshansky
30
/** @file hspstream_collector.c
31
* Default implementation of the BlastHSPStream interface to save hits from
32
* a BLAST search, and subsequently return them in sorted order.
35
#ifndef SKIP_DOXYGEN_PROCESSING
36
static char const rcsid[] =
37
"$Id: hspstream_collector.c,v 1.24 2008/08/21 19:55:43 kazimird Exp $";
38
#endif /* SKIP_DOXYGEN_PROCESSING */
41
#include <algo/blast/core/hspstream_collector.h>
42
#include <algo/blast/core/blast_util.h>
44
/** Default hit saving stream methods */
46
/** Free the BlastHSPStream with its HSP list collector data structure.
47
* @param hsp_stream The HSP stream to free [in]
50
static BlastHSPStream*
51
s_BlastHSPListCollectorFree(BlastHSPStream* hsp_stream)
54
BlastHSPListCollectorData* stream_data =
55
(BlastHSPListCollectorData*) GetData(hsp_stream);
56
stream_data->x_lock = MT_LOCK_Delete(stream_data->x_lock);
57
SBlastHitsParametersFree(stream_data->blasthit_params);
58
Blast_HSPResultsFree(stream_data->results);
59
for (index=0; index < stream_data->num_hsplists; index++)
61
stream_data->sorted_hsplists[index] =
62
Blast_HSPListFree(stream_data->sorted_hsplists[index]);
64
sfree(stream_data->sort_by_score);
65
sfree(stream_data->sorted_hsplists);
71
/** callback used to sort HSP lists in order of decreasing OID
72
* @param x First HSP list [in]
73
* @param y Second HSP list [in]
74
* @return compare result
76
static int s_SortHSPListByOid(const void *x, const void *y)
78
BlastHSPList **xx = (BlastHSPList **)x;
79
BlastHSPList **yy = (BlastHSPList **)y;
80
return (*yy)->oid - (*xx)->oid;
83
/** Prohibit any future writing to the HSP stream when all results are written.
84
* Also perform sorting of results here to prepare them for reading.
85
* @param hsp_stream The HSP stream to close [in] [out]
88
s_BlastHSPListCollectorClose(BlastHSPStream* hsp_stream)
92
BlastHSPResults *results;
94
BlastHSPListCollectorData* stream_data =
95
(BlastHSPListCollectorData*) GetData(hsp_stream);
97
if (stream_data->results == NULL || stream_data->results_sorted)
100
if (stream_data->sort_by_score) {
101
if (stream_data->sort_by_score->sort_on_read) {
102
Blast_HSPResultsReverseSort(stream_data->results);
104
/* Reverse the order of HSP lists, because they will be returned
105
starting from end, for the sake of convenience */
106
Blast_HSPResultsReverseOrder(stream_data->results);
108
stream_data->results_sorted = TRUE;
109
stream_data->x_lock = MT_LOCK_Delete(stream_data->x_lock);
113
results = stream_data->results;
114
num_hsplists = stream_data->num_hsplists;
116
/* concatenate all the HSPLists from 'results' */
118
for (i = 0; i < results->num_queries; i++) {
120
BlastHitList *hitlist = results->hitlist_array[i];
124
/* grow the list if necessary */
126
if (num_hsplists + hitlist->hsplist_count >
127
stream_data->num_hsplists_alloc) {
129
Int4 alloc = MAX(num_hsplists + hitlist->hsplist_count + 100,
130
2 * stream_data->num_hsplists_alloc);
131
stream_data->num_hsplists_alloc = alloc;
132
stream_data->sorted_hsplists = (BlastHSPList **)realloc(
133
stream_data->sorted_hsplists,
134
alloc * sizeof(BlastHSPList *));
137
for (j = k = 0; j < hitlist->hsplist_count; j++) {
139
BlastHSPList *hsplist = hitlist->hsplist_array[j];
143
hsplist->query_index = i;
144
stream_data->sorted_hsplists[num_hsplists + k] = hsplist;
148
hitlist->hsplist_count = 0;
152
/* sort in order of decreasing subject OID. HSPLists will be
153
read out from the end of hsplist_array later */
155
stream_data->num_hsplists = num_hsplists;
156
if (num_hsplists > 1) {
157
qsort(stream_data->sorted_hsplists, num_hsplists,
158
sizeof(BlastHSPList *), s_SortHSPListByOid);
161
stream_data->results_sorted = TRUE;
162
stream_data->x_lock = MT_LOCK_Delete(stream_data->x_lock);
165
/** Read one HSP list from the results saved in an HSP list collector. Once an
166
* HSP list is read from the stream, it relinquishes ownership and removes it
167
* from the internal results data structure.
168
* @param hsp_stream The HSP stream to read from [in]
169
* @param hsp_list_out The read HSP list. [out]
170
* @return Success, error, or end of reading, when nothing left to read.
173
s_BlastHSPListCollectorRead(BlastHSPStream* hsp_stream,
174
BlastHSPList** hsp_list_out)
176
BlastHSPListCollectorData* stream_data =
177
(BlastHSPListCollectorData*) GetData(hsp_stream);
179
*hsp_list_out = NULL;
180
if (!stream_data->results)
181
return kBlastHSPStream_Eof;
183
/* If this stream is not yet closed for writing, close it. In particular,
184
this includes sorting of results.
185
NB: to lift the prohibition on write after the first read, the
186
following 2 lines should be removed, and stream closure for writing
187
should be done outside of the read function. */
188
if (!stream_data->results_sorted)
189
s_BlastHSPListCollectorClose(hsp_stream);
191
if (stream_data->sort_by_score) {
192
Int4 last_hsplist_index = -1, index = 0;
193
BlastHitList* hit_list = NULL;
194
BlastHSPResults* results = stream_data->results;
196
/* Find index of the first query that has results. */
197
for (index = stream_data->sort_by_score->first_query_index;
198
index < results->num_queries; ++index) {
199
if (results->hitlist_array[index] &&
200
results->hitlist_array[index]->hsplist_count > 0)
203
if (index >= results->num_queries)
204
return kBlastHSPStream_Eof;
206
stream_data->sort_by_score->first_query_index = index;
208
hit_list = results->hitlist_array[index];
209
last_hsplist_index = hit_list->hsplist_count - 1;
211
*hsp_list_out = hit_list->hsplist_array[last_hsplist_index];
212
/* Assign the query index here so the caller knows which query this HSP
214
(*hsp_list_out)->query_index = index;
215
/* Dequeue this HSP list by decrementing the HSPList count */
216
--hit_list->hsplist_count;
217
if (hit_list->hsplist_count == 0) {
218
/* Advance the first query index, without checking that the next
219
* query has results - that will be done on the next call. */
220
++stream_data->sort_by_score->first_query_index;
223
/* return the next HSPlist out of the collection stored */
225
if (!stream_data->num_hsplists)
226
return kBlastHSPStream_Eof;
229
stream_data->sorted_hsplists[--stream_data->num_hsplists];
232
return kBlastHSPStream_Success;
235
/** Write an HSP list to the collector HSP stream. The HSP stream assumes
236
* ownership of the HSP list and sets the dereferenced pointer to NULL.
237
* @param hsp_stream Stream to write to. [in] [out]
238
* @param hsp_list Pointer to the HSP list to save in the collector. [in]
239
* @return Success or error, if stream is already closed for writing.
242
s_BlastHSPListCollectorWrite(BlastHSPStream* hsp_stream,
243
BlastHSPList** hsp_list)
246
BlastHSPListCollectorData* stream_data =
247
(BlastHSPListCollectorData*) GetData(hsp_stream);
249
/** Lock the mutex, if necessary */
250
MT_LOCK_Do(stream_data->x_lock, eMT_Lock);
252
/** Prohibit writing after reading has already started. This prohibition
253
* can be lifted later. There is no inherent problem in using read and
254
* write in any order, except that sorting would have to be done on
255
* every read after a write.
257
if (stream_data->results_sorted) {
258
MT_LOCK_Do(stream_data->x_lock, eMT_Unlock);
259
return kBlastHSPStream_Error;
262
/* For RPS BLAST saving procedure is different, because HSPs from different
263
subjects are bundled in one HSP list */
264
if (Blast_ProgramIsRpsBlast(stream_data->program)) {
265
status = Blast_HSPResultsSaveRPSHSPList(stream_data->program,
266
stream_data->results, *hsp_list, stream_data->blasthit_params);
268
status = Blast_HSPResultsSaveHSPList(stream_data->program, stream_data->results,
269
*hsp_list, stream_data->blasthit_params);
273
MT_LOCK_Do(stream_data->x_lock, eMT_Unlock);
274
return kBlastHSPStream_Error;
276
/* Results structure is no longer sorted, even if it was before.
277
The following assignment is only necessary if the logic to prohibit
278
writing after the first read is removed. */
279
stream_data->results_sorted = FALSE;
281
/* Free the caller from this pointer's ownership. */
284
/** Unlock the mutex */
285
MT_LOCK_Do(stream_data->x_lock, eMT_Unlock);
287
return kBlastHSPStream_Success;
290
/* #define _DEBUG_VERBOSE 1 */
291
/** Merge two HSPStreams. The HSPs from the first stream are
292
* moved to the second stream.
293
* @param squery_blk Structure controlling the merge process [in]
294
* @param chunk_num Unique integer assigned to hsp_stream [in]
295
* @param hsp_stream The stream to merge [in][out]
296
* @param combined_hsp_stream The stream that will contain the
297
* HSPLists of the first stream [in][out]
300
s_BlastHSPListCollectorMerge(SSplitQueryBlk *squery_blk,
302
BlastHSPStream* hsp_stream,
303
BlastHSPStream* combined_hsp_stream)
306
BlastHSPListCollectorData* stream1 =
307
(BlastHSPListCollectorData*) GetData(hsp_stream);
308
BlastHSPListCollectorData* stream2 =
309
(BlastHSPListCollectorData*) GetData(combined_hsp_stream);
310
BlastHSPResults *results1 = stream1->results;
311
BlastHSPResults *results2 = stream2->results;
312
Int4 contexts_per_query = BLAST_GetNumberOfContexts(stream2->program);
314
Int4 num_queries = 0, num_ctx = 0, num_ctx_offsets = 0;
318
Uint4 *query_list = NULL, *offset_list = NULL, num_contexts = 0;
319
Int4 *context_list = NULL;
321
SplitQueryBlk_GetQueryIndicesForChunk(squery_blk, chunk_num, &query_list);
322
SplitQueryBlk_GetQueryContextsForChunk(squery_blk, chunk_num,
323
&context_list, &num_contexts);
324
SplitQueryBlk_GetContextOffsetsForChunk(squery_blk, chunk_num, &offset_list);
326
#if defined(_DEBUG_VERBOSE)
327
fprintf(stderr, "Chunk %d\n", chunk_num);
328
fprintf(stderr, "Queries : ");
329
for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++)
330
fprintf(stderr, "%d ", query_list[num_queries]);
331
fprintf(stderr, "\n");
332
fprintf(stderr, "Contexts : ");
333
for (num_ctx = 0; num_ctx < num_contexts; num_ctx++)
334
fprintf(stderr, "%d ", context_list[num_ctx]);
335
fprintf(stderr, "\n");
336
fprintf(stderr, "Context starting offsets : ");
337
for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
339
fprintf(stderr, "%d ", offset_list[num_ctx_offsets]);
340
fprintf(stderr, "\n");
341
#elif defined(_DEBUG)
342
for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++) ;
343
for (num_ctx = 0, max_ctx = INT4_MIN; num_ctx < num_contexts; num_ctx++)
344
max_ctx = MAX(max_ctx, context_list[num_ctx]);
345
for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
349
for (i = 0; i < results1->num_queries; i++) {
350
BlastHitList *hitlist = results1->hitlist_array[i];
351
Int4 global_query = query_list[i];
352
Int4 split_points[NUM_FRAMES];
354
ASSERT(i < num_queries);
357
if (hitlist == NULL) {
358
#if defined(_DEBUG_VERBOSE)
359
fprintf(stderr, "No hits to query %d\n", global_query);
364
/* we will be mapping HSPs from the local context to
365
their place on the unsplit concatenated query. Once
366
that's done, overlapping HSPs need to get merged, and
367
to do that we must know the offset within each context
368
where the last chunk ended and the current chunk begins */
369
for (j = 0; j < contexts_per_query; j++) {
370
Int4 local_context = i * contexts_per_query + j;
371
split_points[context_list[local_context] % contexts_per_query] =
372
offset_list[local_context];
375
#if defined(_DEBUG_VERBOSE)
376
fprintf(stderr, "query %d split points: ", i);
377
for (j = 0; j < contexts_per_query; j++) {
378
fprintf(stderr, "%d ", split_points[j]);
380
fprintf(stderr, "\n");
383
for (j = 0; j < hitlist->hsplist_count; j++) {
384
BlastHSPList *hsplist = hitlist->hsplist_array[j];
386
for (k = 0; k < hsplist->hspcnt; k++) {
387
BlastHSP *hsp = hsplist->hsp_array[k];
388
Int4 local_context = hsp->context;
390
ASSERT(local_context <= max_ctx);
391
ASSERT(local_context < num_ctx);
392
ASSERT(local_context < num_ctx_offsets);
395
hsp->context = context_list[local_context];
396
hsp->query.offset += offset_list[local_context];
397
hsp->query.end += offset_list[local_context];
398
hsp->query.gapped_start += offset_list[local_context];
399
hsp->query.frame = BLAST_ContextToFrame(stream2->program,
403
hsplist->query_index = global_query;
406
Blast_HitListMerge(results1->hitlist_array + i,
407
results2->hitlist_array + global_query,
408
contexts_per_query, split_points,
409
SplitQueryBlk_GetChunkOverlapSize(squery_blk));
412
/* Sort to the canonical order, which the merge may not have done. */
413
for (i = 0; i < results2->num_queries; i++) {
414
BlastHitList *hitlist = results2->hitlist_array[i];
418
for (j = 0; j < hitlist->hsplist_count; j++)
419
Blast_HSPListSortByScore(hitlist->hsplist_array[j]);
422
stream2->results_sorted = FALSE;
425
fprintf(stderr, "new results: %d queries\n", results2->num_queries);
426
for (i = 0; i < results2->num_queries; i++) {
427
BlastHitList *hitlist = results2->hitlist_array[i];
431
for (j = 0; j < hitlist->hsplist_count; j++) {
432
BlastHSPList *hsplist = hitlist->hsplist_array[j];
434
"query %d OID %d\n", hsplist->query_index, hsplist->oid);
436
for (k = 0; k < hsplist->hspcnt; k++) {
437
BlastHSP *hsp = hsplist->hsp_array[k];
438
fprintf(stderr, "c %d q %d-%d s %d-%d score %d\n", hsp->context,
439
hsp->query.offset, hsp->query.end,
440
hsp->subject.offset, hsp->subject.end,
451
return kBlastHSPStream_Success;
454
/** Batch read function for this BlastHSPStream implementation.
455
* @param hsp_stream The BlastHSPStream object [in]
456
* @param batch List of HSP lists for the HSPStream to return. The caller
457
* acquires ownership of all HSP lists returned [out]
458
* @return kBlastHSPStream_Success on success, kBlastHSPStream_Error, or
459
* kBlastHSPStream_Eof on end of stream
461
int s_BlastHSPListCollectorBatchRead(BlastHSPStream* hsp_stream,
462
BlastHSPStreamResultBatch* batch)
467
BlastHSPList *hsplist;
468
BlastHSPListCollectorData* stream_data =
469
(BlastHSPListCollectorData*) GetData(hsp_stream);
471
batch->num_hsplists = 0;
472
if (!stream_data->results)
473
return kBlastHSPStream_Eof;
475
/* If this stream is not yet closed for writing, close it. In particular,
476
this includes sorting of results.
477
NB: to lift the prohibition on write after the first read, the
478
following 2 lines should be removed, and stream closure for writing
479
should be done outside of the read function. */
480
if (!stream_data->results_sorted)
481
s_BlastHSPListCollectorClose(hsp_stream);
483
/* return all the HSPlists with the same subject OID as the
484
last HSPList in the collection stored. We assume there is
485
at most one HSPList per query sequence */
487
num_hsplists = stream_data->num_hsplists;
488
if (num_hsplists == 0)
489
return kBlastHSPStream_Eof;
491
hsplist = stream_data->sorted_hsplists[num_hsplists - 1];
492
target_oid = hsplist->oid;
494
for (i = 0; i < num_hsplists; i++) {
495
hsplist = stream_data->sorted_hsplists[num_hsplists - 1 - i];
496
if (hsplist->oid != target_oid)
499
batch->hsplist_array[i] = hsplist;
502
stream_data->num_hsplists = num_hsplists - i;
503
batch->num_hsplists = i;
505
return kBlastHSPStream_Success;
508
/** Initialize function pointers and data structure in a collector HSP stream.
509
* @param hsp_stream The stream to initialize [in] [out]
510
* @param args Pointer to the collector data structure. [in]
511
* @return Filled HSP stream.
513
static BlastHSPStream*
514
s_BlastHSPListCollectorNew(BlastHSPStream* hsp_stream, void* args)
516
BlastHSPStreamFunctionPointerTypes fnptr;
518
fnptr.dtor = &s_BlastHSPListCollectorFree;
519
SetMethod(hsp_stream, eDestructor, fnptr);
520
fnptr.method = &s_BlastHSPListCollectorRead;
521
SetMethod(hsp_stream, eRead, fnptr);
522
fnptr.method = &s_BlastHSPListCollectorWrite;
523
SetMethod(hsp_stream, eWrite, fnptr);
524
fnptr.batch_read = &s_BlastHSPListCollectorBatchRead;
525
SetMethod(hsp_stream, eBatchRead, fnptr);
526
fnptr.mergeFn = &s_BlastHSPListCollectorMerge;
527
SetMethod(hsp_stream, eMerge, fnptr);
528
fnptr.closeFn = &s_BlastHSPListCollectorClose;
529
SetMethod(hsp_stream, eClose, fnptr);
531
SetData(hsp_stream, args);
536
Blast_HSPListCollectorInitMT(EBlastProgramType program,
537
SBlastHitsParameters* blasthit_params,
538
const BlastExtensionOptions* extn_opts,
539
Boolean sort_on_read,
540
Int4 num_queries, MT_LOCK lock)
542
BlastHSPListCollectorData* stream_data =
543
(BlastHSPListCollectorData*) malloc(sizeof(BlastHSPListCollectorData));
544
BlastHSPStreamNewInfo info;
546
stream_data->program = program;
547
stream_data->blasthit_params = blasthit_params;
549
stream_data->num_hsplists = 0;
550
stream_data->num_hsplists_alloc = 100;
551
stream_data->sorted_hsplists = (BlastHSPList **)malloc(
552
stream_data->num_hsplists_alloc *
553
sizeof(BlastHSPList *));
554
stream_data->results = Blast_HSPResultsNew(num_queries);
556
stream_data->results_sorted = FALSE;
558
/* This is needed to meet a pre-condition of the composition-based
560
if ((Blast_QueryIsProtein(program) || Blast_QueryIsPssm(program)) &&
561
extn_opts->compositionBasedStats != 0) {
562
stream_data->sort_by_score =
563
(SSortByScoreStruct*)calloc(1, sizeof(SSortByScoreStruct));
564
stream_data->sort_by_score->sort_on_read = sort_on_read;
565
stream_data->sort_by_score->first_query_index = 0;
567
stream_data->sort_by_score = NULL;
569
stream_data->x_lock = lock;
571
info.constructor = &s_BlastHSPListCollectorNew;
572
info.ctor_argument = (void*)stream_data;
574
return BlastHSPStreamNew(&info);
578
Blast_HSPListCollectorInit(EBlastProgramType program,
579
SBlastHitsParameters* blasthit_params,
580
const BlastExtensionOptions* extn_opts,
581
Boolean sort_on_read,
584
return Blast_HSPListCollectorInitMT(program, blasthit_params, extn_opts,
585
sort_on_read, num_queries, NULL);