24
24
* ===========================================================================
26
* Author: Christiam Camacho
26
* Author: Ilya Dondoshansky
30
30
/** @file blast_hspstream.c
31
* Definition of ADT to save and retrieve lists of HSPs in the BLAST engine.
31
* BlastHSPStream is used to save hits from preliminary stage and
32
* pass on to the traceback stage.
34
35
#ifndef SKIP_DOXYGEN_PROCESSING
35
36
static char const rcsid[] =
36
"$Id: blast_hspstream.c,v 1.9 2007/12/07 18:25:41 kazimird Exp $";
37
"$Id: blast_hspstream.c,v 1.12 2009/07/15 17:29:31 kazimird Exp $";
37
38
#endif /* SKIP_DOXYGEN_PROCESSING */
39
41
#include <algo/blast/core/blast_hspstream.h>
41
/** Complete type definition of Blast Hsp Stream ADT */
42
struct BlastHSPStream {
43
BlastHSPStreamConstructor NewFnPtr; /**< Constructor */
44
BlastHSPStreamDestructor DeleteFnPtr; /**< Destructor */
46
/* The operational interface */
48
BlastHSPStreamMethod WriteFnPtr; /**< Write to BlastHSPStream */
49
BlastHSPStreamMethod ReadFnPtr; /**< Read from BlastHSPStream */
50
BlastHSPStreamMergeFnType MergeFnPtr; /**< Merge two BlastHSPStreams */
51
BlastHSPStreamBatchReadMethod BatchReadFnPtr; /**< Batch read from
53
BlastHSPStreamCloseFnType CloseFnPtr; /**< Close BlastHSPStream for
55
void* DataStructure; /**< ADT holding HSPStream */
58
BlastHSPStream* BlastHSPStreamNew(const BlastHSPStreamNewInfo* bhsn_info)
60
BlastHSPStream* retval = NULL;
61
BlastHSPStreamFunctionPointerTypes fnptr;
63
if ( bhsn_info == NULL ) {
67
if ( !(retval = (BlastHSPStream*) calloc(1, sizeof(BlastHSPStream)))) {
71
/* Save the constructor and invoke it */
72
fnptr.ctor = bhsn_info->constructor;
73
SetMethod(retval, eConstructor, fnptr);
74
if (retval->NewFnPtr) {
75
retval = (*retval->NewFnPtr)(retval, bhsn_info->ctor_argument);
80
ASSERT(retval->DeleteFnPtr);
81
ASSERT(retval->WriteFnPtr);
82
ASSERT(retval->ReadFnPtr);
87
BlastHSPStream* BlastHSPStreamFree(BlastHSPStream* hsp_stream)
89
BlastHSPStreamDestructor destructor_fnptr = NULL;
92
return (BlastHSPStream*) NULL;
95
if ( !(destructor_fnptr = (*hsp_stream->DeleteFnPtr))) {
100
return (BlastHSPStream*) (*destructor_fnptr)(hsp_stream);
103
int BlastHSPStreamMerge(SSplitQueryBlk *squery_blk,
105
BlastHSPStream* hsp_stream,
106
BlastHSPStream* combined_hsp_stream)
108
BlastHSPStreamMergeFnType merge_fnptr = NULL;
110
if (!hsp_stream || !combined_hsp_stream)
111
return kBlastHSPStream_Error;
113
/* Make sure the input streams are compatible */
114
if (hsp_stream->MergeFnPtr != hsp_stream->MergeFnPtr)
115
return kBlastHSPStream_Error;
117
/** Merge functionality is optional. If merge function is not provided,
119
if ( !(merge_fnptr = (*hsp_stream->MergeFnPtr))) {
120
return kBlastHSPStream_Success;
123
return (*merge_fnptr)(squery_blk, chunk_num,
124
hsp_stream, combined_hsp_stream);
128
BlastHSPStreamResultBatch *
129
Blast_HSPStreamResultBatchInit(Int4 num_hsplists)
131
BlastHSPStreamResultBatch *retval = (BlastHSPStreamResultBatch *)
132
calloc(1, sizeof(BlastHSPStreamResultBatch));
134
retval->hsplist_array = (BlastHSPList **)calloc((size_t)num_hsplists,
135
sizeof(BlastHSPList *));
139
BlastHSPStreamResultBatch *
140
Blast_HSPStreamResultBatchFree(BlastHSPStreamResultBatch *batch)
143
sfree(batch->hsplist_array);
149
void Blast_HSPStreamResultBatchReset(BlastHSPStreamResultBatch *batch)
152
for (i = 0; i < batch->num_hsplists; i++) {
153
batch->hsplist_array[i] =
154
Blast_HSPListFree(batch->hsplist_array[i]);
156
batch->num_hsplists = 0;
159
int BlastHSPStreamBatchRead(BlastHSPStream* hsp_stream,
160
BlastHSPStreamResultBatch *batch)
162
BlastHSPStreamBatchReadMethod batch_read = NULL;
164
if (!hsp_stream || !batch)
165
return kBlastHSPStream_Error;
167
/** Batch read functionality is optional. If batch read
168
function is not provided, just do nothing. */
169
if ( !(batch_read = (*hsp_stream->BatchReadFnPtr))) {
170
return kBlastHSPStream_Success;
173
return (*batch_read)(hsp_stream, batch);
42
#include <algo/blast/core/blast_util.h>
44
/** Default hit saving stream methods */
46
/** Free the BlastHSPStream with its HSP list collector data structure.
47
* @param hsp_stream The HSP stream to free [in]
50
BlastHSPStream* BlastHSPStreamFree(BlastHSPStream* hsp_stream)
59
hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
60
Blast_HSPResultsFree(hsp_stream->results);
61
for (index=0; index < hsp_stream->num_hsplists; index++)
63
hsp_stream->sorted_hsplists[index] =
64
Blast_HSPListFree(hsp_stream->sorted_hsplists[index]);
66
sfree(hsp_stream->sort_by_score);
67
sfree(hsp_stream->sorted_hsplists);
69
if (hsp_stream->writer) {
70
(hsp_stream->writer->FreeFnPtr) (hsp_stream->writer);
71
hsp_stream->writer = NULL;
74
/* free un-used pipes */
75
while (hsp_stream->pre_pipe) {
76
p = hsp_stream->pre_pipe;
77
hsp_stream->pre_pipe = p->next;
80
while (hsp_stream->tback_pipe) {
81
p = hsp_stream->tback_pipe;
82
hsp_stream->tback_pipe = p->next;
90
/** callback used to sort HSP lists in order of decreasing OID
91
* @param x First HSP list [in]
92
* @param y Second HSP list [in]
93
* @return compare result
95
static int s_SortHSPListByOid(const void *x, const void *y)
97
BlastHSPList **xx = (BlastHSPList **)x;
98
BlastHSPList **yy = (BlastHSPList **)y;
99
return (*yy)->oid - (*xx)->oid;
102
/** Prohibit any future writing to the HSP stream when all results are written.
103
* Also perform sorting of results here to prepare them for reading.
104
* @param hsp_stream The HSP stream to close [in] [out]
176
106
void BlastHSPStreamClose(BlastHSPStream* hsp_stream)
178
BlastHSPStreamCloseFnType close_fnptr = NULL;
183
/** Close functionality is optional. If closing function is not provided,
185
if ( !(close_fnptr = (*hsp_stream->CloseFnPtr))) {
189
(*close_fnptr)(hsp_stream);
110
BlastHSPResults *results;
113
if (!hsp_stream || !hsp_stream->results || hsp_stream->results_sorted)
116
/* perform post-writer clean ups */
117
if (hsp_stream->writer) {
118
if (!hsp_stream->writer_initialized) {
119
/* some filter (e.g. hsp_queue) always needs finalization */
120
(hsp_stream->writer->InitFnPtr)
121
(hsp_stream->writer->data, hsp_stream->results);
123
(hsp_stream->writer->FinalFnPtr)
124
(hsp_stream->writer->data, hsp_stream->results);
127
/* apply preliminary stage pipes */
128
while (hsp_stream->pre_pipe) {
129
pipe = hsp_stream->pre_pipe;
130
hsp_stream->pre_pipe = pipe->next;
131
(pipe->RunFnPtr) (pipe->data, hsp_stream->results);
132
(pipe->FreeFnPtr) (pipe);
135
if (hsp_stream->sort_by_score) {
136
if (hsp_stream->sort_by_score->sort_on_read) {
137
Blast_HSPResultsReverseSort(hsp_stream->results);
139
/* Reverse the order of HSP lists, because they will be returned
140
starting from end, for the sake of convenience */
141
Blast_HSPResultsReverseOrder(hsp_stream->results);
143
hsp_stream->results_sorted = TRUE;
144
hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
148
results = hsp_stream->results;
149
num_hsplists = hsp_stream->num_hsplists;
151
/* concatenate all the HSPLists from 'results' */
153
for (i = 0; i < results->num_queries; i++) {
155
BlastHitList *hitlist = results->hitlist_array[i];
159
/* grow the list if necessary */
161
if (num_hsplists + hitlist->hsplist_count >
162
hsp_stream->num_hsplists_alloc) {
164
Int4 alloc = MAX(num_hsplists + hitlist->hsplist_count + 100,
165
2 * hsp_stream->num_hsplists_alloc);
166
hsp_stream->num_hsplists_alloc = alloc;
167
hsp_stream->sorted_hsplists = (BlastHSPList **)realloc(
168
hsp_stream->sorted_hsplists,
169
alloc * sizeof(BlastHSPList *));
172
for (j = k = 0; j < hitlist->hsplist_count; j++) {
174
BlastHSPList *hsplist = hitlist->hsplist_array[j];
178
hsplist->query_index = i;
179
hsp_stream->sorted_hsplists[num_hsplists + k] = hsplist;
183
hitlist->hsplist_count = 0;
187
/* sort in order of decreasing subject OID. HSPLists will be
188
read out from the end of hsplist_array later */
190
hsp_stream->num_hsplists = num_hsplists;
191
if (num_hsplists > 1) {
192
qsort(hsp_stream->sorted_hsplists, num_hsplists,
193
sizeof(BlastHSPList *), s_SortHSPListByOid);
196
hsp_stream->results_sorted = TRUE;
197
hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
200
/** Closing the HSP after traceback is done.
201
* This is mainly to provide a chance to apply post-traceback pipes.
202
* @param hsp_stream The HSP stream to close [in] [out]
203
* @param results The traceback results [in] [out]
205
void BlastHSPStreamTBackClose(BlastHSPStream* hsp_stream,
206
BlastHSPResults* results)
210
if (!hsp_stream || !results) {
214
/* apply traceback stage pipes */
215
while (hsp_stream->tback_pipe) {
216
pipe = hsp_stream->tback_pipe;
217
hsp_stream->tback_pipe = pipe->next;
218
(pipe->RunFnPtr) (pipe->data, results);
219
(pipe->FreeFnPtr) (pipe);
192
224
const int kBlastHSPStream_Error = -1;
193
225
const int kBlastHSPStream_Success = 0;
194
226
const int kBlastHSPStream_Eof = 1;
196
/** This method is akin to a vtable dispatcher, invoking the method registered
197
* upon creation of the implementation of the BlastHSPStream interface
228
/** Read one HSP list from the results saved in an HSP list collector. Once an
229
* HSP list is read from the stream, it relinquishes ownership and removes it
230
* from the internal results data structure.
231
* @param hsp_stream The HSP stream to read from [in]
232
* @param hsp_list_out The read HSP list. [out]
233
* @return Success, error, or end of reading, when nothing left to read.
235
int BlastHSPStreamRead(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list_out)
237
*hsp_list_out = NULL;
240
return kBlastHSPStream_Error;
242
if (!hsp_stream->results)
243
return kBlastHSPStream_Eof;
245
/* If this stream is not yet closed for writing, close it. In particular,
246
this includes sorting of results.
247
NB: to lift the prohibition on write after the first read, the
248
following 2 lines should be removed, and stream closure for writing
249
should be done outside of the read function. */
250
if (!hsp_stream->results_sorted)
251
BlastHSPStreamClose(hsp_stream);
253
if (hsp_stream->sort_by_score) {
254
Int4 last_hsplist_index = -1, index = 0;
255
BlastHitList* hit_list = NULL;
256
BlastHSPResults* results = hsp_stream->results;
258
/* Find index of the first query that has results. */
259
for (index = hsp_stream->sort_by_score->first_query_index;
260
index < results->num_queries; ++index) {
261
if (results->hitlist_array[index] &&
262
results->hitlist_array[index]->hsplist_count > 0)
265
if (index >= results->num_queries)
266
return kBlastHSPStream_Eof;
268
hsp_stream->sort_by_score->first_query_index = index;
270
hit_list = results->hitlist_array[index];
271
last_hsplist_index = hit_list->hsplist_count - 1;
273
*hsp_list_out = hit_list->hsplist_array[last_hsplist_index];
274
/* Assign the query index here so the caller knows which query this HSP
276
(*hsp_list_out)->query_index = index;
277
/* Dequeue this HSP list by decrementing the HSPList count */
278
--hit_list->hsplist_count;
279
if (hit_list->hsplist_count == 0) {
280
/* Advance the first query index, without checking that the next
281
* query has results - that will be done on the next call. */
282
++hsp_stream->sort_by_score->first_query_index;
285
/* return the next HSPlist out of the collection stored */
287
if (!hsp_stream->num_hsplists)
288
return kBlastHSPStream_Eof;
291
hsp_stream->sorted_hsplists[--hsp_stream->num_hsplists];
294
return kBlastHSPStream_Success;
297
/** Write an HSP list to the collector HSP stream. The HSP stream assumes
298
* ownership of the HSP list and sets the dereferenced pointer to NULL.
299
* @param hsp_stream Stream to write to. [in] [out]
300
* @param hsp_list Pointer to the HSP list to save in the collector. [in]
301
* @return Success or error, if stream is already closed for writing.
303
int BlastHSPStreamWrite(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
308
return kBlastHSPStream_Error;
310
/** Lock the mutex, if necessary */
311
MT_LOCK_Do(hsp_stream->x_lock, eMT_Lock);
313
/** Prohibit writing after reading has already started. This prohibition
314
* can be lifted later. There is no inherent problem in using read and
315
* write in any order, except that sorting would have to be done on
316
* every read after a write.
318
if (hsp_stream->results_sorted) {
319
MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
320
return kBlastHSPStream_Error;
323
if (hsp_stream->writer) {
324
/** if writer has not been initialized, initialize it first */
325
if (!(hsp_stream->writer_initialized)) {
326
(hsp_stream->writer->InitFnPtr)
327
(hsp_stream->writer->data, hsp_stream->results);
328
hsp_stream->writer_initialized = TRUE;
331
/** filtering processing */
332
status = (hsp_stream->writer->RunFnPtr)
333
(hsp_stream->writer->data, *hsp_list);
337
MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
338
return kBlastHSPStream_Error;
340
/* Results structure is no longer sorted, even if it was before.
341
The following assignment is only necessary if the logic to prohibit
342
writing after the first read is removed. */
343
hsp_stream->results_sorted = FALSE;
345
/* Free the caller from this pointer's ownership. */
348
/** Unlock the mutex */
349
MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
351
return kBlastHSPStream_Success;
354
/* #define _DEBUG_VERBOSE 1 */
355
/** Merge two HSPStreams. The HSPs from the first stream are
356
* moved to the second stream.
357
* @param squery_blk Structure controlling the merge process [in]
358
* @param chunk_num Unique integer assigned to hsp_stream [in]
359
* @param stream1 The stream to merge [in][out]
360
* @param stream2 The stream that will contain the
361
* HSPLists of the first stream [in][out]
363
int BlastHSPStreamMerge(SSplitQueryBlk *squery_blk,
365
BlastHSPStream* stream1,
366
BlastHSPStream* stream2)
369
BlastHSPResults *results1 = NULL;
370
BlastHSPResults *results2 = NULL;
371
Int4 contexts_per_query = 0;
373
Int4 num_queries = 0, num_ctx = 0, num_ctx_offsets = 0;
377
Uint4 *query_list = NULL, *offset_list = NULL, num_contexts = 0;
378
Int4 *context_list = NULL;
381
if (!stream1 || !stream2)
382
return kBlastHSPStream_Error;
384
results1 = stream1->results;
385
results2 = stream2->results;
386
contexts_per_query = BLAST_GetNumberOfContexts(stream2->program);
388
SplitQueryBlk_GetQueryIndicesForChunk(squery_blk, chunk_num, &query_list);
389
SplitQueryBlk_GetQueryContextsForChunk(squery_blk, chunk_num,
390
&context_list, &num_contexts);
391
SplitQueryBlk_GetContextOffsetsForChunk(squery_blk, chunk_num, &offset_list);
393
#if defined(_DEBUG_VERBOSE)
394
fprintf(stderr, "Chunk %d\n", chunk_num);
395
fprintf(stderr, "Queries : ");
396
for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++)
397
fprintf(stderr, "%d ", query_list[num_queries]);
398
fprintf(stderr, "\n");
399
fprintf(stderr, "Contexts : ");
400
for (num_ctx = 0; num_ctx < num_contexts; num_ctx++)
401
fprintf(stderr, "%d ", context_list[num_ctx]);
402
fprintf(stderr, "\n");
403
fprintf(stderr, "Context starting offsets : ");
404
for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
406
fprintf(stderr, "%d ", offset_list[num_ctx_offsets]);
407
fprintf(stderr, "\n");
408
#elif defined(_DEBUG)
409
for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++) ;
410
for (num_ctx = 0, max_ctx = INT4_MIN; num_ctx < num_contexts; num_ctx++)
411
max_ctx = MAX(max_ctx, context_list[num_ctx]);
412
for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
416
for (i = 0; i < results1->num_queries; i++) {
417
BlastHitList *hitlist = results1->hitlist_array[i];
418
Int4 global_query = query_list[i];
419
Int4 split_points[NUM_FRAMES];
421
ASSERT(i < num_queries);
424
if (hitlist == NULL) {
425
#if defined(_DEBUG_VERBOSE)
426
fprintf(stderr, "No hits to query %d\n", global_query);
431
/* we will be mapping HSPs from the local context to
432
their place on the unsplit concatenated query. Once
433
that's done, overlapping HSPs need to get merged, and
434
to do that we must know the offset within each context
435
where the last chunk ended and the current chunk begins */
436
for (j = 0; j < contexts_per_query; j++) {
437
split_points[j] = -1;
440
for (j = 0; j < contexts_per_query; j++) {
441
Int4 local_context = i * contexts_per_query + j;
442
if (context_list[local_context] >= 0) {
443
split_points[context_list[local_context] % contexts_per_query] =
444
offset_list[local_context];
448
#if defined(_DEBUG_VERBOSE)
449
fprintf(stderr, "query %d split points: ", i);
450
for (j = 0; j < contexts_per_query; j++) {
451
fprintf(stderr, "%d ", split_points[j]);
453
fprintf(stderr, "\n");
456
for (j = 0; j < hitlist->hsplist_count; j++) {
457
BlastHSPList *hsplist = hitlist->hsplist_array[j];
459
for (k = 0; k < hsplist->hspcnt; k++) {
460
BlastHSP *hsp = hsplist->hsp_array[k];
461
Int4 local_context = hsp->context;
463
ASSERT(local_context <= max_ctx);
464
ASSERT(local_context < num_ctx);
465
ASSERT(local_context < num_ctx_offsets);
468
hsp->context = context_list[local_context];
469
hsp->query.offset += offset_list[local_context];
470
hsp->query.end += offset_list[local_context];
471
hsp->query.gapped_start += offset_list[local_context];
472
hsp->query.frame = BLAST_ContextToFrame(stream2->program,
476
hsplist->query_index = global_query;
479
Blast_HitListMerge(results1->hitlist_array + i,
480
results2->hitlist_array + global_query,
481
contexts_per_query, split_points,
482
SplitQueryBlk_GetChunkOverlapSize(squery_blk));
485
/* Sort to the canonical order, which the merge may not have done. */
486
for (i = 0; i < results2->num_queries; i++) {
487
BlastHitList *hitlist = results2->hitlist_array[i];
491
for (j = 0; j < hitlist->hsplist_count; j++)
492
Blast_HSPListSortByScore(hitlist->hsplist_array[j]);
495
stream2->results_sorted = FALSE;
498
fprintf(stderr, "new results: %d queries\n", results2->num_queries);
499
for (i = 0; i < results2->num_queries; i++) {
500
BlastHitList *hitlist = results2->hitlist_array[i];
504
for (j = 0; j < hitlist->hsplist_count; j++) {
505
BlastHSPList *hsplist = hitlist->hsplist_array[j];
507
"query %d OID %d\n", hsplist->query_index, hsplist->oid);
509
for (k = 0; k < hsplist->hspcnt; k++) {
510
BlastHSP *hsp = hsplist->hsp_array[k];
511
fprintf(stderr, "c %d q %d-%d s %d-%d score %d\n", hsp->context,
512
hsp->query.offset, hsp->query.end,
513
hsp->subject.offset, hsp->subject.end,
524
return kBlastHSPStream_Success;
527
/** Batch read function for this BlastHSPStream implementation.
198
528
* @param hsp_stream The BlastHSPStream object [in]
199
* @param name Name of the method to invoke on hsp_stream [in]
200
* @param hsp_list HSP list to work with [in] [out]
201
* @return kBlastHSPStream_Error on NULL hsp_stream or NULL method pointer
202
* (i.e.: unimplemented or uninitialized method on the BlastHSPStream
203
* interface) or return value of the implementation.
529
* @param batch List of HSP lists for the HSPStream to return. The caller
530
* acquires ownership of all HSP lists returned [out]
531
* @return kBlastHSPStream_Success on success, kBlastHSPStream_Error, or
532
* kBlastHSPStream_Eof on end of stream
206
_MethodDispatcher(BlastHSPStream* hsp_stream, EMethodName name,
207
BlastHSPList** hsp_list)
209
BlastHSPStreamMethod method_fnptr = NULL;
212
return kBlastHSPStream_Error;
215
ASSERT(name < eMethodBoundary);
219
method_fnptr = (*hsp_stream->ReadFnPtr);
223
method_fnptr = (*hsp_stream->WriteFnPtr);
227
abort(); /* should never happen */
231
return kBlastHSPStream_Error;
234
return (*method_fnptr)(hsp_stream, hsp_list);
237
int BlastHSPStreamRead(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
239
return _MethodDispatcher(hsp_stream, eRead, hsp_list);
242
int BlastHSPStreamWrite(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
244
return _MethodDispatcher(hsp_stream, eWrite, hsp_list);
247
/*****************************************************************************/
249
void* GetData(BlastHSPStream* hsp_stream)
255
return hsp_stream->DataStructure;
258
int SetData(BlastHSPStream* hsp_stream, void* data)
261
return kBlastHSPStream_Error;
264
hsp_stream->DataStructure = data;
266
return kBlastHSPStream_Success;
269
int SetMethod(BlastHSPStream* hsp_stream,
271
BlastHSPStreamFunctionPointerTypes fnptr_type)
274
return kBlastHSPStream_Error;
277
ASSERT(name < eMethodBoundary);
281
hsp_stream->NewFnPtr = fnptr_type.ctor;
285
hsp_stream->DeleteFnPtr = fnptr_type.dtor;
289
hsp_stream->ReadFnPtr = fnptr_type.method;
293
hsp_stream->BatchReadFnPtr = fnptr_type.batch_read;
297
hsp_stream->WriteFnPtr = fnptr_type.method;
301
hsp_stream->MergeFnPtr = fnptr_type.mergeFn;
305
hsp_stream->CloseFnPtr = fnptr_type.closeFn;
309
abort(); /* should never happen */
312
return kBlastHSPStream_Success;
534
int BlastHSPStreamBatchRead(BlastHSPStream* hsp_stream,
535
BlastHSPStreamResultBatch* batch)
540
BlastHSPList *hsplist;
542
if (!hsp_stream || !batch)
543
return kBlastHSPStream_Error;
545
batch->num_hsplists = 0;
546
if (!hsp_stream->results)
547
return kBlastHSPStream_Eof;
549
/* If this stream is not yet closed for writing, close it. In particular,
550
this includes sorting of results.
551
NB: to lift the prohibition on write after the first read, the
552
following 2 lines should be removed, and stream closure for writing
553
should be done outside of the read function. */
554
if (!hsp_stream->results_sorted)
555
BlastHSPStreamClose(hsp_stream);
557
/* return all the HSPlists with the same subject OID as the
558
last HSPList in the collection stored. We assume there is
559
at most one HSPList per query sequence */
561
num_hsplists = hsp_stream->num_hsplists;
562
if (num_hsplists == 0)
563
return kBlastHSPStream_Eof;
565
hsplist = hsp_stream->sorted_hsplists[num_hsplists - 1];
566
target_oid = hsplist->oid;
568
for (i = 0; i < num_hsplists; i++) {
569
hsplist = hsp_stream->sorted_hsplists[num_hsplists - 1 - i];
570
if (hsplist->oid != target_oid)
573
batch->hsplist_array[i] = hsplist;
576
hsp_stream->num_hsplists = num_hsplists - i;
577
batch->num_hsplists = i;
579
return kBlastHSPStream_Success;
582
BlastHSPStreamResultBatch *
583
Blast_HSPStreamResultBatchInit(Int4 num_hsplists)
585
BlastHSPStreamResultBatch *retval = (BlastHSPStreamResultBatch *)
586
calloc(1, sizeof(BlastHSPStreamResultBatch));
588
retval->hsplist_array = (BlastHSPList **)calloc((size_t)num_hsplists,
589
sizeof(BlastHSPList *));
593
BlastHSPStreamResultBatch *
594
Blast_HSPStreamResultBatchFree(BlastHSPStreamResultBatch *batch)
597
sfree(batch->hsplist_array);
603
void Blast_HSPStreamResultBatchReset(BlastHSPStreamResultBatch *batch)
606
for (i = 0; i < batch->num_hsplists; i++) {
607
batch->hsplist_array[i] =
608
Blast_HSPListFree(batch->hsplist_array[i]);
610
batch->num_hsplists = 0;
614
BlastHSPStreamNew(EBlastProgramType program,
615
const BlastExtensionOptions* extn_opts,
616
Boolean sort_on_read,
618
BlastHSPWriter *writer)
620
BlastHSPStream* hsp_stream =
621
(BlastHSPStream*) malloc(sizeof(BlastHSPStream));
623
hsp_stream->program = program;
625
hsp_stream->num_hsplists = 0;
626
hsp_stream->num_hsplists_alloc = 100;
627
hsp_stream->sorted_hsplists = (BlastHSPList **)malloc(
628
hsp_stream->num_hsplists_alloc *
629
sizeof(BlastHSPList *));
630
hsp_stream->results = Blast_HSPResultsNew(num_queries);
632
hsp_stream->results_sorted = FALSE;
634
/* This is needed to meet a pre-condition of the composition-based
636
if ((Blast_QueryIsProtein(program) || Blast_QueryIsPssm(program)) &&
637
extn_opts->compositionBasedStats != 0) {
638
hsp_stream->sort_by_score =
639
(SSortByScoreStruct*)calloc(1, sizeof(SSortByScoreStruct));
640
hsp_stream->sort_by_score->sort_on_read = sort_on_read;
641
hsp_stream->sort_by_score->first_query_index = 0;
643
hsp_stream->sort_by_score = NULL;
645
hsp_stream->x_lock = NULL;
646
hsp_stream->writer = writer;
647
hsp_stream->writer_initialized = FALSE;
648
hsp_stream->pre_pipe = NULL;
649
hsp_stream->tback_pipe = NULL;
654
int BlastHSPStreamRegisterMTLock(BlastHSPStream* hsp_stream,
657
/* only one lock can be registered */
658
if (!hsp_stream || (hsp_stream->x_lock && lock)) {
659
MT_LOCK_Delete(lock);
662
hsp_stream->x_lock = lock;
666
int BlastHSPStreamRegisterPipe(BlastHSPStream* hsp_stream,
672
if (!hsp_stream || !pipe) {
680
p = hsp_stream->pre_pipe;
682
hsp_stream->pre_pipe = pipe;
686
case eTracebackSearch:
687
p = hsp_stream->tback_pipe;
689
hsp_stream->tback_pipe = pipe;
697
/* insert the pipe at the end */
698
for (; p && p->next; p = p->next);
705
BlastHSPWriterNew (BlastHSPWriterInfo** writer_info,
706
BlastQueryInfo* query_info)
708
BlastHSPWriter * writer = NULL;
709
if(writer_info && *writer_info) {
710
writer = ((*writer_info)->NewFnPtr) ((*writer_info)->params, query_info);
713
ASSERT(writer_info && *writer_info == NULL);
718
BlastHSPPipeInfo_Add(BlastHSPPipeInfo** head,
719
BlastHSPPipeInfo* node)
723
BlastHSPPipeInfo* tmp = *head;
736
BlastHSPPipeNew (BlastHSPPipeInfo** pipe_info,
737
BlastQueryInfo* query_info)
739
BlastHSPPipe *pipe = NULL;
740
BlastHSPPipe *p = pipe;
741
BlastHSPPipeInfo *info = *pipe_info;
742
BlastHSPPipeInfo *q = info;
746
p->next = (info->NewFnPtr) (info->params, query_info);
749
pipe = (info->NewFnPtr) (info->params, query_info);