~ubuntu-branches/ubuntu/saucy/ncbi-tools6/saucy-proposed

« back to all changes in this revision

Viewing changes to algo/blast/core/blast_hspstream.c

  • Committer: Bazaar Package Importer
  • Author(s): Aaron M. Ucko
  • Date: 2009-08-11 22:03:47 UTC
  • mfrom: (1.4.1 upstream)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: james.westby@ubuntu.com-20090811220347-g4b6lzdvphvvbpiu
* New upstream release.
* debian/libncbi6.symbols: update accordingly.
* debian/control: clean up obsolete or redundant relationship declarations.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*  $Id: blast_hspstream.c,v 1.9 2007/12/07 18:25:41 kazimird Exp $
 
1
/*  $Id: blast_hspstream.c,v 1.12 2009/07/15 17:29:31 kazimird Exp $
2
2
 * ===========================================================================
3
3
 *
4
4
 *                            PUBLIC DOMAIN NOTICE
23
23
 *
24
24
 * ===========================================================================
25
25
 *
26
 
 * Author:  Christiam Camacho
 
26
 * Author:  Ilya Dondoshansky
27
27
 *
28
28
 */
29
29
 
30
30
/** @file blast_hspstream.c
31
 
 * Definition of ADT to save and retrieve lists of HSPs in the BLAST engine.
 
31
 * BlastHSPStream is used to save hits from preliminary stage and 
 
32
 * pass on to the traceback stage.
32
33
 */
33
34
 
34
35
#ifndef SKIP_DOXYGEN_PROCESSING
35
36
static char const rcsid[] = 
36
 
    "$Id: blast_hspstream.c,v 1.9 2007/12/07 18:25:41 kazimird Exp $";
 
37
    "$Id: blast_hspstream.c,v 1.12 2009/07/15 17:29:31 kazimird Exp $";
37
38
#endif /* SKIP_DOXYGEN_PROCESSING */
38
39
 
 
40
 
39
41
#include <algo/blast/core/blast_hspstream.h>
40
 
 
41
 
/** Complete type definition of Blast Hsp Stream ADT */
42
 
struct BlastHSPStream {
43
 
   BlastHSPStreamConstructor NewFnPtr;    /**< Constructor */
44
 
   BlastHSPStreamDestructor  DeleteFnPtr; /**< Destructor */
45
 
   
46
 
   /* The operational interface */
47
 
   
48
 
   BlastHSPStreamMethod      WriteFnPtr;  /**< Write to BlastHSPStream */
49
 
   BlastHSPStreamMethod      ReadFnPtr;   /**< Read from BlastHSPStream */
50
 
   BlastHSPStreamMergeFnType MergeFnPtr;  /**< Merge two BlastHSPStreams */
51
 
   BlastHSPStreamBatchReadMethod  BatchReadFnPtr;  /**< Batch read from 
52
 
                                                     BlastHSPStream */
53
 
   BlastHSPStreamCloseFnType CloseFnPtr;  /**< Close BlastHSPStream for
54
 
                                             writing */
55
 
   void* DataStructure;                   /**< ADT holding HSPStream */
56
 
};
57
 
 
58
 
BlastHSPStream* BlastHSPStreamNew(const BlastHSPStreamNewInfo* bhsn_info)
59
 
{
60
 
    BlastHSPStream* retval = NULL;
61
 
    BlastHSPStreamFunctionPointerTypes fnptr;
62
 
 
63
 
    if ( bhsn_info == NULL ) {
64
 
        return NULL;
65
 
    }
66
 
 
67
 
    if ( !(retval = (BlastHSPStream*) calloc(1, sizeof(BlastHSPStream)))) {
68
 
        return NULL;
69
 
    }
70
 
 
71
 
    /* Save the constructor and invoke it */
72
 
    fnptr.ctor = bhsn_info->constructor;
73
 
    SetMethod(retval, eConstructor, fnptr);
74
 
    if (retval->NewFnPtr) {
75
 
        retval = (*retval->NewFnPtr)(retval, bhsn_info->ctor_argument);
76
 
    } else {
77
 
        sfree(retval);
78
 
    }
79
 
 
80
 
    ASSERT(retval->DeleteFnPtr);
81
 
    ASSERT(retval->WriteFnPtr);
82
 
    ASSERT(retval->ReadFnPtr);
83
 
 
84
 
    return retval;
85
 
}
86
 
 
87
 
BlastHSPStream* BlastHSPStreamFree(BlastHSPStream* hsp_stream)
88
 
{
89
 
    BlastHSPStreamDestructor destructor_fnptr = NULL;
90
 
 
91
 
    if (!hsp_stream) {
92
 
        return (BlastHSPStream*) NULL;
93
 
    }
94
 
 
95
 
    if ( !(destructor_fnptr = (*hsp_stream->DeleteFnPtr))) {
96
 
        sfree(hsp_stream);
97
 
        return NULL;
98
 
    }
99
 
 
100
 
    return (BlastHSPStream*) (*destructor_fnptr)(hsp_stream);
101
 
}
102
 
 
103
 
int BlastHSPStreamMerge(SSplitQueryBlk *squery_blk,
104
 
                        Uint4 chunk_num,
105
 
                        BlastHSPStream* hsp_stream,
106
 
                        BlastHSPStream* combined_hsp_stream)
107
 
{
108
 
    BlastHSPStreamMergeFnType merge_fnptr = NULL;
109
 
 
110
 
    if (!hsp_stream || !combined_hsp_stream)
111
 
       return kBlastHSPStream_Error;
112
 
 
113
 
    /* Make sure the input streams are compatible */
114
 
    if (hsp_stream->MergeFnPtr != hsp_stream->MergeFnPtr)
115
 
       return kBlastHSPStream_Error;
116
 
 
117
 
    /** Merge functionality is optional. If merge function is not provided,
118
 
        just do nothing. */
119
 
    if ( !(merge_fnptr = (*hsp_stream->MergeFnPtr))) {
120
 
       return kBlastHSPStream_Success;
121
 
    }
122
 
 
123
 
    return (*merge_fnptr)(squery_blk, chunk_num, 
124
 
                          hsp_stream, combined_hsp_stream);
125
 
}
126
 
 
127
 
 
128
 
BlastHSPStreamResultBatch * 
129
 
Blast_HSPStreamResultBatchInit(Int4 num_hsplists)
130
 
{
131
 
    BlastHSPStreamResultBatch *retval = (BlastHSPStreamResultBatch *)
132
 
                             calloc(1, sizeof(BlastHSPStreamResultBatch));
133
 
 
134
 
    retval->hsplist_array = (BlastHSPList **)calloc((size_t)num_hsplists,
135
 
                                               sizeof(BlastHSPList *));
136
 
    return retval;
137
 
}
138
 
 
139
 
BlastHSPStreamResultBatch * 
140
 
Blast_HSPStreamResultBatchFree(BlastHSPStreamResultBatch *batch)
141
 
{
142
 
    if (batch != NULL) {
143
 
        sfree(batch->hsplist_array);
144
 
        sfree(batch);
145
 
    }
146
 
    return NULL;
147
 
}
148
 
 
149
 
void Blast_HSPStreamResultBatchReset(BlastHSPStreamResultBatch *batch)
150
 
{
151
 
    Int4 i;
152
 
    for (i = 0; i < batch->num_hsplists; i++) {
153
 
        batch->hsplist_array[i] = 
154
 
           Blast_HSPListFree(batch->hsplist_array[i]);
155
 
    }
156
 
    batch->num_hsplists = 0;
157
 
}
158
 
 
159
 
int BlastHSPStreamBatchRead(BlastHSPStream* hsp_stream,
160
 
                            BlastHSPStreamResultBatch *batch)
161
 
{
162
 
    BlastHSPStreamBatchReadMethod batch_read = NULL;
163
 
 
164
 
    if (!hsp_stream || !batch)
165
 
       return kBlastHSPStream_Error;
166
 
 
167
 
    /** Batch read functionality is optional. If batch read 
168
 
        function is not provided, just do nothing. */
169
 
    if ( !(batch_read = (*hsp_stream->BatchReadFnPtr))) {
170
 
       return kBlastHSPStream_Success;
171
 
    }
172
 
 
173
 
    return (*batch_read)(hsp_stream, batch);
174
 
}
175
 
 
 
42
#include <algo/blast/core/blast_util.h>
 
43
 
 
44
/** Default hit saving stream methods */
 
45
 
 
46
/** Free the BlastHSPStream with its HSP list collector data structure.
 
47
 * @param hsp_stream The HSP stream to free [in]
 
48
 * @return NULL.
 
49
 */
 
50
BlastHSPStream* BlastHSPStreamFree(BlastHSPStream* hsp_stream) 
 
51
{
 
52
   int index=0;
 
53
   BlastHSPPipe *p;
 
54
 
 
55
   if (!hsp_stream) {
 
56
       return NULL;
 
57
   }
 
58
 
 
59
   hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
 
60
   Blast_HSPResultsFree(hsp_stream->results);
 
61
   for (index=0; index < hsp_stream->num_hsplists; index++)
 
62
   {
 
63
        hsp_stream->sorted_hsplists[index] =
 
64
            Blast_HSPListFree(hsp_stream->sorted_hsplists[index]);
 
65
   }
 
66
   sfree(hsp_stream->sort_by_score);
 
67
   sfree(hsp_stream->sorted_hsplists);
 
68
   
 
69
   if (hsp_stream->writer) {
 
70
       (hsp_stream->writer->FreeFnPtr) (hsp_stream->writer);
 
71
       hsp_stream->writer = NULL;
 
72
   }
 
73
 
 
74
   /* free un-used pipes */
 
75
   while (hsp_stream->pre_pipe) {
 
76
       p = hsp_stream->pre_pipe;
 
77
       hsp_stream->pre_pipe = p->next;
 
78
       sfree(p);
 
79
   }
 
80
   while (hsp_stream->tback_pipe) {
 
81
       p = hsp_stream->tback_pipe;
 
82
       hsp_stream->tback_pipe = p->next;
 
83
       sfree(p);
 
84
   }
 
85
       
 
86
   sfree(hsp_stream);
 
87
   return NULL;
 
88
}
 
89
 
 
90
/** callback used to sort HSP lists in order of decreasing OID
 
91
 * @param x First HSP list [in]
 
92
 * @param y Second HSP list [in]
 
93
 * @return compare result
 
94
 */           
 
95
static int s_SortHSPListByOid(const void *x, const void *y)
 
96
{   
 
97
        BlastHSPList **xx = (BlastHSPList **)x;
 
98
            BlastHSPList **yy = (BlastHSPList **)y;
 
99
                return (*yy)->oid - (*xx)->oid;
 
100
}
 
101
 
 
102
/** Prohibit any future writing to the HSP stream when all results are written.
 
103
 * Also perform sorting of results here to prepare them for reading.
 
104
 * @param hsp_stream The HSP stream to close [in] [out]
 
105
 */ 
176
106
void BlastHSPStreamClose(BlastHSPStream* hsp_stream)
177
107
{
178
 
    BlastHSPStreamCloseFnType close_fnptr = NULL;
179
 
 
180
 
    if (!hsp_stream)
181
 
       return;
182
 
 
183
 
    /** Close functionality is optional. If closing function is not provided,
184
 
        just do nothing. */
185
 
    if ( !(close_fnptr = (*hsp_stream->CloseFnPtr))) {
186
 
       return;
187
 
    }
188
 
 
189
 
    (*close_fnptr)(hsp_stream);
 
108
   Int4 i, j, k;
 
109
   Int4 num_hsplists;
 
110
   BlastHSPResults *results;
 
111
   BlastHSPPipe *pipe;
 
112
 
 
113
   if (!hsp_stream || !hsp_stream->results || hsp_stream->results_sorted)
 
114
      return;
 
115
 
 
116
   /* perform post-writer clean ups */
 
117
   if (hsp_stream->writer) {
 
118
       if (!hsp_stream->writer_initialized) {
 
119
           /* some filter (e.g. hsp_queue) always needs finalization */
 
120
           (hsp_stream->writer->InitFnPtr)
 
121
                (hsp_stream->writer->data, hsp_stream->results);
 
122
       }
 
123
       (hsp_stream->writer->FinalFnPtr)
 
124
            (hsp_stream->writer->data, hsp_stream->results);
 
125
   }
 
126
 
 
127
   /* apply preliminary stage pipes */
 
128
   while (hsp_stream->pre_pipe) {
 
129
       pipe = hsp_stream->pre_pipe;
 
130
       hsp_stream->pre_pipe = pipe->next;
 
131
       (pipe->RunFnPtr) (pipe->data, hsp_stream->results);
 
132
       (pipe->FreeFnPtr) (pipe);
 
133
   }
 
134
 
 
135
   if (hsp_stream->sort_by_score) {
 
136
       if (hsp_stream->sort_by_score->sort_on_read) {
 
137
           Blast_HSPResultsReverseSort(hsp_stream->results);
 
138
       } else {
 
139
           /* Reverse the order of HSP lists, because they will be returned
 
140
              starting from end, for the sake of convenience */
 
141
           Blast_HSPResultsReverseOrder(hsp_stream->results);
 
142
       }
 
143
       hsp_stream->results_sorted = TRUE;
 
144
       hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
 
145
       return;
 
146
   }
 
147
 
 
148
   results = hsp_stream->results;
 
149
   num_hsplists = hsp_stream->num_hsplists;
 
150
 
 
151
   /* concatenate all the HSPLists from 'results' */
 
152
 
 
153
   for (i = 0; i < results->num_queries; i++) {
 
154
 
 
155
       BlastHitList *hitlist = results->hitlist_array[i];
 
156
       if (hitlist == NULL)
 
157
           continue;
 
158
 
 
159
       /* grow the list if necessary */
 
160
 
 
161
       if (num_hsplists + hitlist->hsplist_count > 
 
162
                                hsp_stream->num_hsplists_alloc) {
 
163
 
 
164
           Int4 alloc = MAX(num_hsplists + hitlist->hsplist_count + 100,
 
165
                            2 * hsp_stream->num_hsplists_alloc);
 
166
           hsp_stream->num_hsplists_alloc = alloc;
 
167
           hsp_stream->sorted_hsplists = (BlastHSPList **)realloc(
 
168
                                         hsp_stream->sorted_hsplists,
 
169
                                         alloc * sizeof(BlastHSPList *));
 
170
       }
 
171
 
 
172
       for (j = k = 0; j < hitlist->hsplist_count; j++) {
 
173
 
 
174
           BlastHSPList *hsplist = hitlist->hsplist_array[j];
 
175
           if (hsplist == NULL)
 
176
               continue;
 
177
 
 
178
           hsplist->query_index = i;
 
179
           hsp_stream->sorted_hsplists[num_hsplists + k] = hsplist;
 
180
           k++;
 
181
       }
 
182
 
 
183
       hitlist->hsplist_count = 0;
 
184
       num_hsplists += k;
 
185
   }
 
186
 
 
187
   /* sort in order of decreasing subject OID. HSPLists will be
 
188
      read out from the end of hsplist_array later */
 
189
 
 
190
   hsp_stream->num_hsplists = num_hsplists;
 
191
   if (num_hsplists > 1) {
 
192
      qsort(hsp_stream->sorted_hsplists, num_hsplists, 
 
193
                    sizeof(BlastHSPList *), s_SortHSPListByOid);
 
194
   }
 
195
 
 
196
   hsp_stream->results_sorted = TRUE;
 
197
   hsp_stream->x_lock = MT_LOCK_Delete(hsp_stream->x_lock);
 
198
}
 
199
 
 
200
/** Closing the HSP after traceback is done.
 
201
 * This is mainly to provide a chance to apply post-traceback pipes.
 
202
 * @param hsp_stream The HSP stream to close [in] [out]
 
203
 * @param results The traceback results [in] [out]
 
204
 */ 
 
205
void BlastHSPStreamTBackClose(BlastHSPStream* hsp_stream, 
 
206
                              BlastHSPResults* results)
 
207
{
 
208
   BlastHSPPipe *pipe;
 
209
 
 
210
   if (!hsp_stream || !results) {
 
211
       return;
 
212
   }
 
213
 
 
214
   /* apply traceback stage pipes */
 
215
   while (hsp_stream->tback_pipe) {
 
216
       pipe = hsp_stream->tback_pipe;
 
217
       hsp_stream->tback_pipe = pipe->next;
 
218
       (pipe->RunFnPtr) (pipe->data, results);
 
219
       (pipe->FreeFnPtr) (pipe);
 
220
   }
 
221
   return;
190
222
}
191
223
 
192
224
const int kBlastHSPStream_Error = -1;
193
225
const int kBlastHSPStream_Success = 0;
194
226
const int kBlastHSPStream_Eof = 1;
195
227
 
196
 
/** This method is akin to a vtable dispatcher, invoking the method registered
197
 
 * upon creation of the implementation of the BlastHSPStream interface 
 
228
/** Read one HSP list from the results saved in an HSP list collector. Once an
 
229
 * HSP list is read from the stream, it relinquishes ownership and removes it
 
230
 * from the internal results data structure.
 
231
 * @param hsp_stream The HSP stream to read from [in]
 
232
 * @param hsp_list_out The read HSP list. [out]
 
233
 * @return Success, error, or end of reading, when nothing left to read.
 
234
 */
 
235
int BlastHSPStreamRead(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list_out) 
 
236
{
 
237
   *hsp_list_out = NULL;
 
238
 
 
239
   if (!hsp_stream) 
 
240
      return kBlastHSPStream_Error;
 
241
 
 
242
   if (!hsp_stream->results)
 
243
      return kBlastHSPStream_Eof;
 
244
 
 
245
   /* If this stream is not yet closed for writing, close it. In particular,
 
246
      this includes sorting of results. 
 
247
      NB: to lift the prohibition on write after the first read, the 
 
248
      following 2 lines should be removed, and stream closure for writing 
 
249
      should be done outside of the read function. */
 
250
   if (!hsp_stream->results_sorted)
 
251
       BlastHSPStreamClose(hsp_stream);
 
252
 
 
253
   if (hsp_stream->sort_by_score) {
 
254
       Int4 last_hsplist_index = -1, index = 0;
 
255
       BlastHitList* hit_list = NULL;
 
256
       BlastHSPResults* results = hsp_stream->results;
 
257
 
 
258
       /* Find index of the first query that has results. */
 
259
       for (index = hsp_stream->sort_by_score->first_query_index; 
 
260
            index < results->num_queries; ++index) {
 
261
          if (results->hitlist_array[index] && 
 
262
              results->hitlist_array[index]->hsplist_count > 0)
 
263
             break;
 
264
       }
 
265
       if (index >= results->num_queries)
 
266
          return kBlastHSPStream_Eof;
 
267
 
 
268
       hsp_stream->sort_by_score->first_query_index = index;
 
269
 
 
270
       hit_list = results->hitlist_array[index];
 
271
       last_hsplist_index = hit_list->hsplist_count - 1;
 
272
 
 
273
       *hsp_list_out = hit_list->hsplist_array[last_hsplist_index];
 
274
       /* Assign the query index here so the caller knows which query this HSP 
 
275
          list comes from */
 
276
       (*hsp_list_out)->query_index = index;
 
277
       /* Dequeue this HSP list by decrementing the HSPList count */
 
278
       --hit_list->hsplist_count;
 
279
       if (hit_list->hsplist_count == 0) {
 
280
          /* Advance the first query index, without checking that the next
 
281
           * query has results - that will be done on the next call. */
 
282
          ++hsp_stream->sort_by_score->first_query_index;
 
283
       }
 
284
   } else {
 
285
       /* return the next HSPlist out of the collection stored */
 
286
 
 
287
       if (!hsp_stream->num_hsplists)
 
288
          return kBlastHSPStream_Eof;
 
289
 
 
290
       *hsp_list_out = 
 
291
           hsp_stream->sorted_hsplists[--hsp_stream->num_hsplists];
 
292
 
 
293
   }
 
294
   return kBlastHSPStream_Success;
 
295
}
 
296
 
 
297
/** Write an HSP list to the collector HSP stream. The HSP stream assumes 
 
298
 * ownership of the HSP list and sets the dereferenced pointer to NULL.
 
299
 * @param hsp_stream Stream to write to. [in] [out]
 
300
 * @param hsp_list Pointer to the HSP list to save in the collector. [in]
 
301
 * @return Success or error, if stream is already closed for writing.
 
302
 */
 
303
int BlastHSPStreamWrite(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
 
304
{
 
305
   Int2 status = 0;
 
306
 
 
307
   if (!hsp_stream) 
 
308
      return kBlastHSPStream_Error;
 
309
 
 
310
   /** Lock the mutex, if necessary */
 
311
   MT_LOCK_Do(hsp_stream->x_lock, eMT_Lock);
 
312
 
 
313
   /** Prohibit writing after reading has already started. This prohibition
 
314
    *  can be lifted later. There is no inherent problem in using read and
 
315
    *  write in any order, except that sorting would have to be done on 
 
316
    *  every read after a write. 
 
317
    */
 
318
   if (hsp_stream->results_sorted) {
 
319
      MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
 
320
      return kBlastHSPStream_Error;
 
321
   }
 
322
 
 
323
   if (hsp_stream->writer) { 
 
324
       /** if writer has not been initialized, initialize it first */
 
325
      if (!(hsp_stream->writer_initialized)) {
 
326
          (hsp_stream->writer->InitFnPtr)
 
327
                   (hsp_stream->writer->data, hsp_stream->results);
 
328
          hsp_stream->writer_initialized = TRUE;
 
329
      }
 
330
          
 
331
      /** filtering processing */
 
332
      status = (hsp_stream->writer->RunFnPtr)
 
333
               (hsp_stream->writer->data, *hsp_list);
 
334
   }
 
335
 
 
336
   if (status != 0) {
 
337
      MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
 
338
      return kBlastHSPStream_Error;
 
339
   }
 
340
   /* Results structure is no longer sorted, even if it was before. 
 
341
      The following assignment is only necessary if the logic to prohibit
 
342
      writing after the first read is removed. */
 
343
   hsp_stream->results_sorted = FALSE;
 
344
 
 
345
   /* Free the caller from this pointer's ownership. */
 
346
   *hsp_list = NULL;
 
347
 
 
348
   /** Unlock the mutex */
 
349
   MT_LOCK_Do(hsp_stream->x_lock, eMT_Unlock);
 
350
 
 
351
   return kBlastHSPStream_Success;
 
352
}
 
353
 
 
354
/* #define _DEBUG_VERBOSE 1 */
 
355
/** Merge two HSPStreams. The HSPs from the first stream are
 
356
 *  moved to the second stream.
 
357
 * @param squery_blk Structure controlling the merge process [in]
 
358
 * @param chunk_num Unique integer assigned to hsp_stream [in]
 
359
 * @param stream1 The stream to merge [in][out]
 
360
 * @param stream2 The stream that will contain the
 
361
 *         HSPLists of the first stream [in][out]
 
362
 */
 
363
int BlastHSPStreamMerge(SSplitQueryBlk *squery_blk,
 
364
                             Uint4 chunk_num,
 
365
                             BlastHSPStream* stream1,
 
366
                             BlastHSPStream* stream2)
 
367
{
 
368
   Int4 i, j, k;
 
369
   BlastHSPResults *results1 = NULL;
 
370
   BlastHSPResults *results2 = NULL;
 
371
   Int4 contexts_per_query = 0;
 
372
#ifdef _DEBUG
 
373
   Int4 num_queries = 0, num_ctx = 0, num_ctx_offsets = 0;
 
374
   Int4 max_ctx;
 
375
#endif
 
376
   
 
377
   Uint4 *query_list = NULL, *offset_list = NULL, num_contexts = 0;
 
378
   Int4 *context_list = NULL;
 
379
 
 
380
 
 
381
   if (!stream1 || !stream2) 
 
382
       return kBlastHSPStream_Error;
 
383
 
 
384
   results1 = stream1->results;
 
385
   results2 = stream2->results;
 
386
   contexts_per_query = BLAST_GetNumberOfContexts(stream2->program);
 
387
 
 
388
   SplitQueryBlk_GetQueryIndicesForChunk(squery_blk, chunk_num, &query_list);
 
389
   SplitQueryBlk_GetQueryContextsForChunk(squery_blk, chunk_num, 
 
390
                                          &context_list, &num_contexts);
 
391
   SplitQueryBlk_GetContextOffsetsForChunk(squery_blk, chunk_num, &offset_list);
 
392
 
 
393
#if defined(_DEBUG_VERBOSE)
 
394
   fprintf(stderr, "Chunk %d\n", chunk_num);
 
395
   fprintf(stderr, "Queries : ");
 
396
   for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++)
 
397
       fprintf(stderr, "%d ", query_list[num_queries]);
 
398
   fprintf(stderr, "\n");
 
399
   fprintf(stderr, "Contexts : ");
 
400
   for (num_ctx = 0; num_ctx < num_contexts; num_ctx++)
 
401
       fprintf(stderr, "%d ", context_list[num_ctx]);
 
402
   fprintf(stderr, "\n");
 
403
   fprintf(stderr, "Context starting offsets : ");
 
404
   for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
 
405
        num_ctx_offsets++)
 
406
       fprintf(stderr, "%d ", offset_list[num_ctx_offsets]);
 
407
   fprintf(stderr, "\n");
 
408
#elif defined(_DEBUG)
 
409
   for (num_queries = 0; query_list[num_queries] != UINT4_MAX; num_queries++) ;
 
410
   for (num_ctx = 0, max_ctx = INT4_MIN; num_ctx < num_contexts; num_ctx++) 
 
411
       max_ctx = MAX(max_ctx, context_list[num_ctx]);
 
412
   for (num_ctx_offsets = 0; offset_list[num_ctx_offsets] != UINT4_MAX;
 
413
        num_ctx_offsets++) ;
 
414
#endif
 
415
 
 
416
   for (i = 0; i < results1->num_queries; i++) {
 
417
       BlastHitList *hitlist = results1->hitlist_array[i];
 
418
       Int4 global_query = query_list[i];
 
419
       Int4 split_points[NUM_FRAMES];
 
420
#ifdef _DEBUG
 
421
       ASSERT(i < num_queries);
 
422
#endif
 
423
 
 
424
       if (hitlist == NULL) {
 
425
#if defined(_DEBUG_VERBOSE)
 
426
fprintf(stderr, "No hits to query %d\n", global_query);
 
427
#endif
 
428
           continue;
 
429
       }
 
430
 
 
431
       /* we will be mapping HSPs from the local context to
 
432
          their place on the unsplit concatenated query. Once
 
433
          that's done, overlapping HSPs need to get merged, and
 
434
          to do that we must know the offset within each context
 
435
          where the last chunk ended and the current chunk begins */
 
436
       for (j = 0; j < contexts_per_query; j++) {
 
437
           split_points[j] = -1;
 
438
       }
 
439
 
 
440
       for (j = 0; j < contexts_per_query; j++) {
 
441
           Int4 local_context = i * contexts_per_query + j;
 
442
           if (context_list[local_context] >= 0) {
 
443
               split_points[context_list[local_context] % contexts_per_query] = 
 
444
                                offset_list[local_context];
 
445
           }
 
446
       }
 
447
 
 
448
#if defined(_DEBUG_VERBOSE)
 
449
       fprintf(stderr, "query %d split points: ", i);
 
450
       for (j = 0; j < contexts_per_query; j++) {
 
451
           fprintf(stderr, "%d ", split_points[j]);
 
452
       }
 
453
       fprintf(stderr, "\n");
 
454
#endif
 
455
 
 
456
       for (j = 0; j < hitlist->hsplist_count; j++) {
 
457
           BlastHSPList *hsplist = hitlist->hsplist_array[j];
 
458
 
 
459
           for (k = 0; k < hsplist->hspcnt; k++) {
 
460
               BlastHSP *hsp = hsplist->hsp_array[k];
 
461
               Int4 local_context = hsp->context;
 
462
#ifdef _DEBUG
 
463
               ASSERT(local_context <= max_ctx);
 
464
               ASSERT(local_context < num_ctx);
 
465
               ASSERT(local_context < num_ctx_offsets);
 
466
#endif
 
467
 
 
468
               hsp->context = context_list[local_context];
 
469
               hsp->query.offset += offset_list[local_context];
 
470
               hsp->query.end += offset_list[local_context];
 
471
               hsp->query.gapped_start += offset_list[local_context];
 
472
               hsp->query.frame = BLAST_ContextToFrame(stream2->program,
 
473
                                                       hsp->context);
 
474
           }
 
475
 
 
476
           hsplist->query_index = global_query;
 
477
       }
 
478
 
 
479
       Blast_HitListMerge(results1->hitlist_array + i,
 
480
                          results2->hitlist_array + global_query,
 
481
                          contexts_per_query, split_points,
 
482
                          SplitQueryBlk_GetChunkOverlapSize(squery_blk));
 
483
   }
 
484
 
 
485
   /* Sort to the canonical order, which the merge may not have done. */
 
486
   for (i = 0; i < results2->num_queries; i++) {
 
487
       BlastHitList *hitlist = results2->hitlist_array[i];
 
488
       if (hitlist == NULL)
 
489
           continue;
 
490
 
 
491
       for (j = 0; j < hitlist->hsplist_count; j++)
 
492
           Blast_HSPListSortByScore(hitlist->hsplist_array[j]);
 
493
   }
 
494
 
 
495
   stream2->results_sorted = FALSE;
 
496
 
 
497
#if _DEBUG_VERBOSE
 
498
   fprintf(stderr, "new results: %d queries\n", results2->num_queries);
 
499
   for (i = 0; i < results2->num_queries; i++) {
 
500
       BlastHitList *hitlist = results2->hitlist_array[i];
 
501
       if (hitlist == NULL)
 
502
           continue;
 
503
 
 
504
       for (j = 0; j < hitlist->hsplist_count; j++) {
 
505
           BlastHSPList *hsplist = hitlist->hsplist_array[j];
 
506
           fprintf(stderr, 
 
507
                   "query %d OID %d\n", hsplist->query_index, hsplist->oid);
 
508
 
 
509
           for (k = 0; k < hsplist->hspcnt; k++) {
 
510
               BlastHSP *hsp = hsplist->hsp_array[k];
 
511
               fprintf(stderr, "c %d q %d-%d s %d-%d score %d\n", hsp->context,
 
512
                      hsp->query.offset, hsp->query.end,
 
513
                      hsp->subject.offset, hsp->subject.end,
 
514
                      hsp->score);
 
515
           }
 
516
       }
 
517
   }
 
518
#endif
 
519
 
 
520
   sfree(query_list);
 
521
   sfree(context_list);
 
522
   sfree(offset_list);
 
523
 
 
524
   return kBlastHSPStream_Success;
 
525
}
 
526
 
 
527
/** Batch read function for this BlastHSPStream implementation.      
198
528
 * @param hsp_stream The BlastHSPStream object [in]
199
 
 * @param name Name of the method to invoke on hsp_stream [in]
200
 
 * @param hsp_list HSP list to work with [in] [out]
201
 
 * @return kBlastHSPStream_Error on NULL hsp_stream or NULL method pointer 
202
 
 * (i.e.: unimplemented or uninitialized method on the BlastHSPStream 
203
 
 * interface) or return value of the implementation.
 
529
 * @param batch List of HSP lists for the HSPStream to return. The caller
 
530
 * acquires ownership of all HSP lists returned [out]
 
531
 * @return kBlastHSPStream_Success on success, kBlastHSPStream_Error, or
 
532
 * kBlastHSPStream_Eof on end of stream
204
533
 */
205
 
static int 
206
 
_MethodDispatcher(BlastHSPStream* hsp_stream, EMethodName name, 
207
 
                  BlastHSPList** hsp_list)
208
 
{
209
 
    BlastHSPStreamMethod method_fnptr = NULL;
210
 
 
211
 
    if (!hsp_stream) {
212
 
        return kBlastHSPStream_Error;
213
 
    }
214
 
 
215
 
    ASSERT(name < eMethodBoundary); 
216
 
 
217
 
    switch (name) {
218
 
    case eRead:
219
 
        method_fnptr = (*hsp_stream->ReadFnPtr);
220
 
        break;
221
 
 
222
 
    case eWrite:
223
 
        method_fnptr = (*hsp_stream->WriteFnPtr);
224
 
        break;
225
 
 
226
 
    default:
227
 
        abort();    /* should never happen */
228
 
    }
229
 
 
230
 
    if (!method_fnptr) {
231
 
        return kBlastHSPStream_Error;
232
 
    }
233
 
 
234
 
    return (*method_fnptr)(hsp_stream, hsp_list);
235
 
}
236
 
 
237
 
int BlastHSPStreamRead(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
238
 
{
239
 
    return _MethodDispatcher(hsp_stream, eRead, hsp_list);
240
 
}
241
 
 
242
 
int BlastHSPStreamWrite(BlastHSPStream* hsp_stream, BlastHSPList** hsp_list)
243
 
{
244
 
    return _MethodDispatcher(hsp_stream, eWrite, hsp_list);
245
 
}
246
 
 
247
 
/*****************************************************************************/
248
 
 
249
 
void* GetData(BlastHSPStream* hsp_stream)
250
 
{
251
 
    if ( !hsp_stream ) {
252
 
        return NULL;
253
 
    }
254
 
 
255
 
    return hsp_stream->DataStructure;
256
 
}
257
 
 
258
 
int SetData(BlastHSPStream* hsp_stream, void* data)
259
 
{
260
 
    if ( !hsp_stream ) {
261
 
        return kBlastHSPStream_Error;
262
 
    }
263
 
 
264
 
    hsp_stream->DataStructure = data;
265
 
 
266
 
    return kBlastHSPStream_Success;
267
 
}
268
 
 
269
 
int SetMethod(BlastHSPStream* hsp_stream, 
270
 
              EMethodName name,
271
 
              BlastHSPStreamFunctionPointerTypes fnptr_type)
272
 
{
273
 
    if ( !hsp_stream ) {
274
 
        return kBlastHSPStream_Error;
275
 
    }
276
 
 
277
 
    ASSERT(name < eMethodBoundary); 
278
 
 
279
 
    switch (name) {
280
 
    case eConstructor:
281
 
        hsp_stream->NewFnPtr = fnptr_type.ctor;
282
 
        break;
283
 
 
284
 
    case eDestructor:
285
 
        hsp_stream->DeleteFnPtr = fnptr_type.dtor;
286
 
        break;
287
 
 
288
 
    case eRead:
289
 
        hsp_stream->ReadFnPtr = fnptr_type.method;
290
 
        break;
291
 
 
292
 
    case eBatchRead:
293
 
        hsp_stream->BatchReadFnPtr = fnptr_type.batch_read;
294
 
        break;
295
 
 
296
 
    case eWrite:
297
 
        hsp_stream->WriteFnPtr = fnptr_type.method;
298
 
        break;
299
 
 
300
 
    case eMerge:
301
 
        hsp_stream->MergeFnPtr = fnptr_type.mergeFn;
302
 
        break;
303
 
 
304
 
    case eClose:
305
 
       hsp_stream->CloseFnPtr = fnptr_type.closeFn;
306
 
       break;
307
 
 
308
 
    default:
309
 
        abort();    /* should never happen */
310
 
    }
311
 
 
312
 
    return kBlastHSPStream_Success;
313
 
}
 
534
int BlastHSPStreamBatchRead(BlastHSPStream* hsp_stream,
 
535
                            BlastHSPStreamResultBatch* batch) 
 
536
{
 
537
   Int4 i;
 
538
   Int4 num_hsplists;
 
539
   Int4 target_oid;
 
540
   BlastHSPList *hsplist;
 
541
 
 
542
   if (!hsp_stream || !batch)
 
543
       return kBlastHSPStream_Error;
 
544
 
 
545
   batch->num_hsplists = 0;
 
546
   if (!hsp_stream->results)
 
547
      return kBlastHSPStream_Eof;
 
548
 
 
549
   /* If this stream is not yet closed for writing, close it. In particular,
 
550
      this includes sorting of results. 
 
551
      NB: to lift the prohibition on write after the first read, the 
 
552
      following 2 lines should be removed, and stream closure for writing 
 
553
      should be done outside of the read function. */
 
554
   if (!hsp_stream->results_sorted)
 
555
      BlastHSPStreamClose(hsp_stream);
 
556
 
 
557
   /* return all the HSPlists with the same subject OID as the
 
558
      last HSPList in the collection stored. We assume there is
 
559
      at most one HSPList per query sequence */
 
560
 
 
561
   num_hsplists = hsp_stream->num_hsplists;
 
562
   if (num_hsplists == 0)
 
563
      return kBlastHSPStream_Eof;
 
564
 
 
565
   hsplist = hsp_stream->sorted_hsplists[num_hsplists - 1];
 
566
   target_oid = hsplist->oid;
 
567
 
 
568
   for (i = 0; i < num_hsplists; i++) {
 
569
       hsplist = hsp_stream->sorted_hsplists[num_hsplists - 1 - i];
 
570
       if (hsplist->oid != target_oid)
 
571
           break;
 
572
 
 
573
       batch->hsplist_array[i] = hsplist;
 
574
   }
 
575
 
 
576
   hsp_stream->num_hsplists = num_hsplists - i;
 
577
   batch->num_hsplists = i;
 
578
 
 
579
   return kBlastHSPStream_Success;
 
580
}
 
581
 
 
582
BlastHSPStreamResultBatch *                                                                                
 
583
Blast_HSPStreamResultBatchInit(Int4 num_hsplists)                                                          
 
584
{                                                                                                          
 
585
    BlastHSPStreamResultBatch *retval = (BlastHSPStreamResultBatch *)                                      
 
586
                             calloc(1, sizeof(BlastHSPStreamResultBatch));                                 
 
587
                                                                                                           
 
588
    retval->hsplist_array = (BlastHSPList **)calloc((size_t)num_hsplists,                                  
 
589
                                               sizeof(BlastHSPList *));                                    
 
590
    return retval;                                                                                         
 
591
}                                                                                                          
 
592
                                                                                                           
 
593
BlastHSPStreamResultBatch *                                                                                
 
594
Blast_HSPStreamResultBatchFree(BlastHSPStreamResultBatch *batch)                                           
 
595
{                                                                                                          
 
596
    if (batch != NULL) {                                                                                   
 
597
        sfree(batch->hsplist_array);                                                                       
 
598
        sfree(batch);                                                                                      
 
599
    }                                                                                                      
 
600
    return NULL;                                                                                           
 
601
}                                                                                                          
 
602
                                                                                                           
 
603
void Blast_HSPStreamResultBatchReset(BlastHSPStreamResultBatch *batch)                                     
 
604
{                                                                                                          
 
605
    Int4 i;                                                                                                
 
606
    for (i = 0; i < batch->num_hsplists; i++) {                                                            
 
607
        batch->hsplist_array[i] =                                                                          
 
608
           Blast_HSPListFree(batch->hsplist_array[i]);                                                     
 
609
    }                                                                                                      
 
610
    batch->num_hsplists = 0;                                                                               
 
611
}                                                
 
612
 
 
613
BlastHSPStream* 
 
614
BlastHSPStreamNew(EBlastProgramType program, 
 
615
                  const BlastExtensionOptions* extn_opts,
 
616
                  Boolean sort_on_read,
 
617
                  Int4 num_queries,
 
618
                  BlastHSPWriter *writer)
 
619
{
 
620
    BlastHSPStream* hsp_stream = 
 
621
       (BlastHSPStream*) malloc(sizeof(BlastHSPStream));
 
622
 
 
623
    hsp_stream->program = program;
 
624
 
 
625
    hsp_stream->num_hsplists = 0;
 
626
    hsp_stream->num_hsplists_alloc = 100;
 
627
    hsp_stream->sorted_hsplists = (BlastHSPList **)malloc(
 
628
                                           hsp_stream->num_hsplists_alloc *
 
629
                                           sizeof(BlastHSPList *));
 
630
    hsp_stream->results = Blast_HSPResultsNew(num_queries);
 
631
 
 
632
    hsp_stream->results_sorted = FALSE;
 
633
 
 
634
    /* This is needed to meet a pre-condition of the composition-based
 
635
     * statistics code */
 
636
    if ((Blast_QueryIsProtein(program) || Blast_QueryIsPssm(program)) &&
 
637
        extn_opts->compositionBasedStats != 0) {
 
638
        hsp_stream->sort_by_score = 
 
639
            (SSortByScoreStruct*)calloc(1, sizeof(SSortByScoreStruct));
 
640
        hsp_stream->sort_by_score->sort_on_read = sort_on_read;
 
641
        hsp_stream->sort_by_score->first_query_index = 0;
 
642
    } else {
 
643
        hsp_stream->sort_by_score = NULL;
 
644
    }
 
645
    hsp_stream->x_lock = NULL;
 
646
    hsp_stream->writer = writer;
 
647
    hsp_stream->writer_initialized = FALSE;
 
648
    hsp_stream->pre_pipe = NULL;
 
649
    hsp_stream->tback_pipe = NULL;
 
650
 
 
651
    return hsp_stream;
 
652
}
 
653
 
 
654
int BlastHSPStreamRegisterMTLock(BlastHSPStream* hsp_stream,
 
655
                                 MT_LOCK lock)
 
656
{
 
657
    /* only one lock can be registered */
 
658
    if (!hsp_stream || (hsp_stream->x_lock && lock)) {
 
659
        MT_LOCK_Delete(lock);
 
660
        return -1;
 
661
    }
 
662
    hsp_stream->x_lock = lock;
 
663
    return 0;
 
664
}
 
665
 
 
666
int BlastHSPStreamRegisterPipe(BlastHSPStream* hsp_stream,
 
667
                               BlastHSPPipe* pipe,
 
668
                               EBlastStage stage)
 
669
{
 
670
    BlastHSPPipe *p;
 
671
 
 
672
    if (!hsp_stream || !pipe) {
 
673
        return -1;
 
674
    }
 
675
 
 
676
    pipe->next = NULL;
 
677
 
 
678
    switch(stage) {
 
679
    case ePrelimSearch:
 
680
        p = hsp_stream->pre_pipe; 
 
681
        if (!p) {
 
682
            hsp_stream->pre_pipe = pipe;
 
683
            return 0;
 
684
        }
 
685
        break;
 
686
    case eTracebackSearch:
 
687
        p = hsp_stream->tback_pipe; 
 
688
        if (!p) {
 
689
            hsp_stream->tback_pipe = pipe;
 
690
            return 0;
 
691
        }
 
692
        break;
 
693
    default:
 
694
        return -1;
 
695
    }
 
696
 
 
697
    /* insert the pipe at the end */
 
698
    for (; p && p->next; p = p->next);
 
699
    p->next = pipe;
 
700
   
 
701
    return 0;
 
702
}
 
703
 
 
704
BlastHSPWriter*
 
705
BlastHSPWriterNew (BlastHSPWriterInfo** writer_info,
 
706
                   BlastQueryInfo* query_info)
 
707
{
 
708
    BlastHSPWriter * writer = NULL;
 
709
    if(writer_info && *writer_info) {
 
710
        writer = ((*writer_info)->NewFnPtr) ((*writer_info)->params, query_info);
 
711
        sfree(*writer_info);
 
712
    }
 
713
    ASSERT(writer_info && *writer_info == NULL);
 
714
    return writer;
 
715
}
 
716
 
 
717
BlastHSPPipeInfo*
 
718
BlastHSPPipeInfo_Add(BlastHSPPipeInfo** head,
 
719
                     BlastHSPPipeInfo* node)
 
720
{
 
721
    if (head) {
 
722
        if (*head) {
 
723
            BlastHSPPipeInfo* tmp = *head;
 
724
            while (tmp->next) {
 
725
                tmp = tmp->next;
 
726
            }
 
727
            tmp->next = node;
 
728
        } else {
 
729
            *head = node;
 
730
        }
 
731
    }
 
732
    return node;
 
733
}
 
734
 
 
735
BlastHSPPipe*
 
736
BlastHSPPipeNew (BlastHSPPipeInfo** pipe_info,
 
737
                 BlastQueryInfo* query_info)
 
738
{
 
739
    BlastHSPPipe *pipe = NULL;
 
740
    BlastHSPPipe *p = pipe;
 
741
    BlastHSPPipeInfo *info = *pipe_info;
 
742
    BlastHSPPipeInfo *q = info;
 
743
 
 
744
    while(info) {
 
745
        if (p) {
 
746
            p->next = (info->NewFnPtr) (info->params, query_info);
 
747
            p = p->next;
 
748
        } else {
 
749
            pipe = (info->NewFnPtr) (info->params, query_info);
 
750
            p = pipe;
 
751
        }
 
752
        p->next = NULL;
 
753
        q = info;
 
754
        info = info->next;
 
755
        sfree(q);
 
756
    }
 
757
    *pipe_info = NULL;
 
758
    return pipe;
 
759
}
 
760