1
/* $Id: hspstream_queue.c,v 1.3 2004/06/22 16:22:13 dondosha Exp $
2
* ===========================================================================
5
* National Center for Biotechnology Information
7
* This software/database is a "United States Government Work" under the
8
* terms of the United States Copyright Act. It was written as part of
9
* the author's official duties as a United States Government employee and
10
* thus cannot be copyrighted. This software/database is freely available
11
* to the public for use. The National Library of Medicine and the U.S.
12
* Government have not placed any restriction on its use or reproduction.
14
* Although all reasonable efforts have been taken to ensure the accuracy
15
* and reliability of the software and data, the NLM and the U.S.
16
* Government do not and cannot warrant the performance or results that
17
* may be obtained by using this software or data. The NLM and the U.S.
18
* Government disclaim all warranties, express or implied, including
19
* warranties of performance, merchantability or fitness for any particular
22
* Please cite the author in any work or product based on this material.
24
* ===========================================================================
26
* Author: Ilya Dondoshansky
30
/** @file hspstream_queue.c
31
* Implementation of the BlastHSPStream interface for producing BLAST results
35
static char const rcsid[] =
36
"$Id: hspstream_queue.c,v 1.3 2004/06/22 16:22:13 dondosha Exp $";
39
#include <algo/blast/core/blast_hits.h>
40
#include <algo/blast/api/hspstream_queue.h>
43
/** Default hit saving stream methods */
45
/** Deallocate memory for the BlastHSPStream with an HSP list queue data
47
* @param hsp_stream HSP stream to free [in]
50
static BlastHSPStream*
51
BlastHSPListQueueFree(BlastHSPStream* hsp_stream)
53
BlastHSPListQueueData* stream_data =
54
(BlastHSPListQueueData*) GetData(hsp_stream);
57
NlmSemaDestroy(stream_data->m_resultsSema);
58
NlmMutexDestroy(stream_data->m_resultsMutex);
60
for (node = stream_data->m_queueStart; node; node = node->next) {
61
node->ptr = (void*) Blast_HSPListFree((BlastHSPList*)node->ptr);
63
stream_data->m_queueStart = ListNodeFree(stream_data->m_queueStart);
69
/** Read one HSP list from a queue of HSP lists. If the queue is empty, this
70
* function waits for more results to be written, unless results queue is
71
* already closed for writing.
72
* @param hsp_stream HSP list stream to read from [in]
73
* @param hsp_list_out The read HSP list. NULL, if there is nothing left
74
* in the queue to read.
75
* @return Status: success, error or end of reading.
78
BlastHSPListQueueRead(BlastHSPStream* hsp_stream,
79
BlastHSPList** hsp_list_out)
81
BlastHSPListQueueData* stream_data =
82
(BlastHSPListQueueData*) GetData(hsp_stream);
83
int status = kBlastHSPStream_Error;
86
NlmMutexLockEx(&stream_data->m_resultsMutex);
88
if (!stream_data->m_writingDone) {
89
while (!stream_data->m_writingDone && !stream_data->m_queueStart) {
90
/* Decrement the semaphore count to 0, then wait for it to be
91
* incremented. Note that mutex must be locked whenever the
92
* contents of the stream are checked, but it must be unlocked
93
* for the semaphore wait. */
94
NlmMutexUnlock(stream_data->m_resultsMutex);
95
NlmSemaWait(stream_data->m_resultsSema);
96
NlmMutexLockEx(&stream_data->m_resultsMutex);
100
if (!stream_data->m_queueStart) {
101
/* Nothing in the queue, but no more writing to the queue is expected. */
102
*hsp_list_out = NULL;
103
status = kBlastHSPStream_Eof;
105
ListNode* start_node = stream_data->m_queueStart;
107
*hsp_list_out = (BlastHSPList*) start_node->ptr;
109
stream_data->m_queueStart = start_node->next;
110
start_node->next = NULL;
111
ListNodeFree(start_node);
112
if (!stream_data->m_queueStart)
113
stream_data->m_queueEnd = NULL;
114
status = kBlastHSPStream_Success;
117
NlmMutexUnlock(stream_data->m_resultsMutex);
122
/** Write an HSP list to the results queue.
123
* @param hsp_stream BlastHSPStream to write to. [in]
124
* @param hsp_list Pointer to an HSP list to save in the queue. The HSP stream
125
* takes ownership of the HSP list and sets the dereferenced
126
* pointer to NULL [in]
127
* @return Status: success or error, if queue is already closed for writing.
130
BlastHSPListQueueWrite(BlastHSPStream* hsp_stream,
131
BlastHSPList** hsp_list)
133
BlastHSPListQueueData* stream_data =
134
(BlastHSPListQueueData*) GetData(hsp_stream);
136
/* If input is empty, don't do anything, but return success */
137
if (*hsp_list == NULL)
138
return kBlastHSPStream_Success;
140
/* If stream is closed for writing, return error */
141
if (stream_data->m_writingDone)
142
return kBlastHSPStream_Error;
144
NlmMutexLockEx(&stream_data->m_resultsMutex);
145
stream_data->m_queueEnd =
146
ListNodeAddPointer(&stream_data->m_queueEnd, 0, (void*)(*hsp_list));
147
if (!stream_data->m_queueStart)
148
stream_data->m_queueStart = stream_data->m_queueEnd;
149
/* Free the caller from this pointer's ownership. */
151
/* Increment the semaphore count. */
152
NlmSemaPost(stream_data->m_resultsSema);
153
NlmMutexUnlock(stream_data->m_resultsMutex);
155
return kBlastHSPStream_Success;
158
/** Prohibit any future writing to the HSP list queue. Also increment the
159
* read semaphore, to allow exit out of the wait state in the reading function.
160
* @param hsp_stream The BlastHSPStream pointer [in] [out]
163
BlastHSPListQueueClose(BlastHSPStream* hsp_stream)
165
BlastHSPListQueueData* stream_data =
166
(BlastHSPListQueueData*) GetData(hsp_stream);
167
NlmMutexLockEx(&stream_data->m_resultsMutex);
168
stream_data->m_writingDone = TRUE;
169
/* Increment the semaphore count so the reading thread can get out of
170
* the waiting state and check the m_writingDone variable. */
171
NlmSemaPost(stream_data->m_resultsSema);
172
NlmMutexUnlock(stream_data->m_resultsMutex);
175
/** Set functions pointers and data structure pointer in a new BlastHSPStream
176
* with an HSP list queue data structure.
177
* @param hsp_stream The BlastHSPStream to initialize [in] [out]
178
* @param args Pointer to the HSP list queue data structure [in]
180
static BlastHSPStream*
181
BlastHSPListQueueNew(BlastHSPStream* hsp_stream, void* args)
183
BlastHSPStreamFunctionPointerTypes fnptr;
185
fnptr.dtor = &BlastHSPListQueueFree;
186
SetMethod(hsp_stream, eDestructor, fnptr);
187
fnptr.method = &BlastHSPListQueueRead;
188
SetMethod(hsp_stream, eRead, fnptr);
189
fnptr.method = &BlastHSPListQueueWrite;
190
SetMethod(hsp_stream, eWrite, fnptr);
191
fnptr.closeFn = &BlastHSPListQueueClose;
192
SetMethod(hsp_stream, eClose, fnptr);
194
SetData(hsp_stream, args);
198
/** Create a new BlastHSPStream with an HSP list queue data structure. */
199
BlastHSPStream* Blast_HSPListQueueInit()
201
BlastHSPListQueueData* stream_data =
202
(BlastHSPListQueueData*) calloc(1, sizeof(BlastHSPListQueueData));
203
BlastHSPStreamNewInfo info;
205
/* At the start of the search there is nothing in the results queue, so
206
* initialize the semaphore count with 0. */
207
stream_data->m_resultsSema = NlmSemaInit(0);
208
info.constructor = &BlastHSPListQueueNew;
209
info.ctor_argument = (void*)stream_data;
211
return BlastHSPStreamNew(&info);