~ubuntu-branches/ubuntu/hardy/freeradius/hardy-proposed

« back to all changes in this revision

Viewing changes to src/main/threads.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2006-12-16 20:45:11 UTC
  • mfrom: (3.1.10 feisty)
  • Revision ID: james.westby@ubuntu.com-20061216204511-3pbbsu4s8jtehsor
Tags: 1.1.3-3
Fix POSIX compliance problem in init script.  Closes: #403384. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
2
 * threads.c    request threading support
3
3
 *
4
 
 * Version:     $Id: threads.c,v 1.77 2004/05/07 19:37:57 aland Exp $
 
4
 * Version:     $Id: threads.c,v 1.77.2.7 2005/10/17 18:38:07 aland Exp $
5
5
 *
6
6
 *   This program is free software; you can redistribute it and/or modify
7
7
 *   it under the terms of the GNU General Public License as published by
27
27
 
28
28
#include <stdlib.h>
29
29
#include <string.h>
 
30
 
 
31
/*
 
32
 *      Other OS's have sem_init, OS X doesn't.
 
33
 */
 
34
#ifndef DARWIN
30
35
#include <semaphore.h>
 
36
#else
 
37
#include <mach/task.h>
 
38
#include <mach/semaphore.h>
 
39
 
 
40
#undef sem_t
 
41
#define sem_t semaphore_t
 
42
#undef sem_init
 
43
#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
 
44
#undef sem_wait
 
45
#define sem_wait(s) semaphore_wait(*s)
 
46
#undef sem_post
 
47
#define sem_post(s) semaphore_signal(*s)
 
48
#endif
 
49
 
31
50
#include <signal.h>
32
51
 
33
52
#ifdef HAVE_SYS_WAIT_H
39
58
#include "conffile.h"
40
59
 
41
60
static const char rcsid[] =
42
 
"$Id: threads.c,v 1.77 2004/05/07 19:37:57 aland Exp $";
 
61
"$Id: threads.c,v 1.77.2.7 2005/10/17 18:38:07 aland Exp $";
43
62
 
44
63
#define SEMAPHORE_LOCKED        (0)
45
64
#define SEMAPHORE_UNLOCKED      (1)
79
98
} request_queue_t;
80
99
 
81
100
 
 
101
#define MAX_WAITERS (256)
82
102
/*
83
103
 *      A data structure to manage the thread pool.  There's no real
84
104
 *      need for a data structure, but it makes things conceptually
100
120
        int cleanup_delay;
101
121
 
102
122
        /*
 
123
         *      If threaded, we have to pay more attention to
 
124
         *      child PID's when we fork...
 
125
         */
 
126
        pthread_mutex_t wait_mutex;
 
127
        int             wait_head;
 
128
        int             wait_tail;
 
129
        pid_t           wait[MAX_WAITERS];
 
130
 
 
131
        /*
103
132
         *      All threads wait on this semaphore, for requests
104
133
         *      to enter the queue.
105
134
         */
120
149
static THREAD_POOL thread_pool;
121
150
static int pool_initialized = FALSE;
122
151
 
123
 
/*
124
 
 *      Data structure to keep track of which child forked which
125
 
 *      request.  If we cared, we'd keep a list of "free" and "active"
126
 
 *      entries.
127
 
 *
128
 
 *      FIXME: Have a time out, so we clean up entries which haven't
129
 
 *      been picked up!
130
 
 */
131
 
typedef struct rad_fork_t {
132
 
        pthread_t       thread_id;
133
 
        pid_t           child_pid;
134
 
        sem_t           child_done;
135
 
        int             status; /* exit status of the child */
136
 
        time_t          time_forked;
137
 
} rad_fork_t;
138
 
 
139
 
/*
140
 
 *  This MUST be a power of 2 for it to work properly!
141
 
 */
142
 
#define NUM_FORKERS (8192)
143
 
static rad_fork_t forkers[NUM_FORKERS];
144
 
 
145
 
/*
146
 
 *      This mutex ensures that only one thread is doing certain
147
 
 *      kinds of magic to the previous array.
148
 
 */
149
 
static pthread_mutex_t fork_mutex;
150
 
 
151
152
 
152
153
/*
153
154
 *      A mapping of configuration file names to internal integers
164
165
 
165
166
 
166
167
/*
 
168
 *      We don't want to catch SIGCHLD for a host of reasons.
 
169
 *
 
170
 *      - exec_wait means that someone, somewhere, somewhen, will
 
171
 *      call waitpid(), and catch the child.
 
172
 *
 
173
 *      - SIGCHLD is delivered to a random thread, not the one that
 
174
 *      forked.
 
175
 *
 
176
 *      - if another thread catches the child, we have to coordinate
 
177
 *      with the thread doing the waiting.
 
178
 *
 
179
 *      - if we don't waitpid() for non-wait children, they'll be zombies,
 
180
 *      and will hang around forever.
 
181
 *
 
182
 */
 
183
static void reap_children(void)
 
184
{
 
185
        if (thread_pool.wait_head != thread_pool.wait_tail) {
 
186
                int num;
 
187
                
 
188
                pthread_mutex_lock(&thread_pool.wait_mutex);
 
189
                for (num = ((thread_pool.wait_tail + MAX_WAITERS) - thread_pool.wait_head) % MAX_WAITERS;
 
190
                     num != 0;
 
191
                     num--) {
 
192
                        pid_t pid = thread_pool.wait[thread_pool.wait_head];
 
193
                        
 
194
                        thread_pool.wait_head++;
 
195
                        thread_pool.wait_head %= MAX_WAITERS;
 
196
                        
 
197
                        /*
 
198
                         *      Child is still alive: move it to the tail.
 
199
                         */
 
200
                        if (waitpid(pid, NULL, WNOHANG) == 0) {
 
201
                                if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
 
202
                                    == thread_pool.wait_head) {
 
203
                                        rad_assert(0 == 1);
 
204
                                }
 
205
 
 
206
                                thread_pool.wait[thread_pool.wait_tail] = pid;
 
207
                                thread_pool.wait_tail++;
 
208
                                thread_pool.wait_tail %= MAX_WAITERS;
 
209
                        } /* else no child, or was already reaped */
 
210
                }
 
211
                pthread_mutex_unlock(&thread_pool.wait_mutex);
 
212
        }
 
213
}
 
214
 
 
215
 
 
216
/*
167
217
 *      Add a request to the list of waiting requests.
168
218
 *      This function gets called ONLY from the main handler thread...
169
219
 *
170
220
 *      This function should never fail.
171
221
 */
172
 
static void request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
 
222
static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
173
223
{
174
224
        int num_entries;
175
225
 
195
245
        num_entries = ((thread_pool.queue_tail + thread_pool.queue_size) -
196
246
                       thread_pool.queue_head) % thread_pool.queue_size;
197
247
        if (num_entries == (thread_pool.queue_size - 1)) {
 
248
                int i;
198
249
                request_queue_t *new_queue;
199
250
 
200
251
                /*
209
260
                         */
210
261
                        radlog(L_ERR|L_CONS, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
211
262
                        request->finished = TRUE;
212
 
                        return;
 
263
                        return 0;
213
264
                }
214
265
 
215
266
                /*
220
271
                 *      new one.
221
272
                 */
222
273
                new_queue = rad_malloc(sizeof(*new_queue) * thread_pool.queue_size * 2);
223
 
                memcpy(new_queue, thread_pool.queue,
224
 
                       sizeof(*new_queue) * thread_pool.queue_size);
225
 
                memset(new_queue + sizeof(*new_queue) * thread_pool.queue_size,
 
274
                /*
 
275
                 *      Copy the queue element by element
 
276
                 */
 
277
                for (i = 0; i < thread_pool.queue_size; i++) {
 
278
                        new_queue[i] = thread_pool.queue[(i + thread_pool.queue_head) % thread_pool.queue_size];
 
279
                }
 
280
                memset(new_queue + thread_pool.queue_size,
226
281
                       0, sizeof(*new_queue) * thread_pool.queue_size);
227
282
 
228
283
                free(thread_pool.queue);
229
284
                thread_pool.queue = new_queue;
 
285
                thread_pool.queue_tail = ((thread_pool.queue_tail + thread_pool.queue_size) - thread_pool.queue_head) % thread_pool.queue_size;
 
286
                thread_pool.queue_head = 0;
230
287
                thread_pool.queue_size *= 2;
231
288
        }
232
289
 
254
311
 
255
312
        sem_post(&thread_pool.semaphore);
256
313
 
257
 
        return;
 
314
        return 1;
258
315
}
259
316
 
260
317
/*
262
319
 */
263
320
static void request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
264
321
{
 
322
        reap_children();
 
323
 
265
324
        pthread_mutex_lock(&thread_pool.mutex);
266
325
 
267
326
        /*
661
720
                thread_pool.total_threads = 0;
662
721
                thread_pool.max_thread_num = 1;
663
722
                thread_pool.cleanup_delay = 5;
 
723
 
 
724
                thread_pool.wait_head = thread_pool.wait_tail = 0;
 
725
                if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
 
726
                        radlog(L_ERR, "FATAL: Failed to initialize mutex: %s",
 
727
                               strerror(errno));
 
728
                        exit(1);
 
729
                }               
664
730
        }
665
731
 
666
732
        pool_cf = cf_section_find("thread");
669
735
        }
670
736
 
671
737
        /*
672
 
         *      Limit the maximum number of threads to the maximum
673
 
         *      number of forks we can do.
674
 
         *
675
 
         *      FIXME: Make this code better...
676
 
         */
677
 
        if (thread_pool.max_threads >= NUM_FORKERS) {
678
 
                thread_pool.max_threads = NUM_FORKERS;
679
 
        }
680
 
 
681
 
 
682
 
        /*
683
738
         *      The pool has already been initialized.  Don't spawn
684
739
         *      new threads, and don't forget about forked children,
685
740
         */
742
797
int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
743
798
{
744
799
        /*
 
800
         *      Add the new request to the queue.
 
801
         */
 
802
        if (!request_enqueue(request, fun)) return 0;
 
803
 
 
804
        /*
745
805
         *      If the thread pool is busy handling requests, then
746
806
         *      try to spawn another one.
747
807
         */
750
810
                        radlog(L_INFO,
751
811
                               "The maximum number of threads (%d) are active, cannot spawn new thread to handle request",
752
812
                               thread_pool.max_threads);
753
 
                        return 0;
 
813
                        return 1;
754
814
                }
755
815
        }
756
816
 
757
 
        /*
758
 
         *      Add the new request to the queue.
759
 
         */
760
 
        request_enqueue(request, fun);
761
 
 
762
817
        return 1;
763
818
}
764
819
 
923
978
        return 0;
924
979
}
925
980
 
926
 
static int exec_initialized = FALSE;
927
 
 
928
 
/*
929
 
 *      Initialize the stuff for keeping track of child processes.
930
 
 */
931
 
void rad_exec_init(void)
932
 
{
933
 
        int i;
934
 
 
935
 
        /*
936
 
         *      Initialize the mutex used to remember calls to fork.
937
 
         */
938
 
        pthread_mutex_init(&fork_mutex, NULL);
939
 
 
940
 
        /*
941
 
         *      Initialize the data structure where we remember the
942
 
         *      mappings of thread ID && child PID to exit status.
943
 
         */
944
 
        for (i = 0; i < NUM_FORKERS; i++) {
945
 
                forkers[i].thread_id = NO_SUCH_CHILD_PID;
946
 
                forkers[i].child_pid = -1;
947
 
                forkers[i].status = 0;
948
 
        }
949
 
 
950
 
        exec_initialized = TRUE;
951
 
}
952
 
 
953
 
/*
954
 
 *      We use the PID number as a base for the array index, so that
955
 
 *      we can quickly turn the PID into a free array entry, instead
956
 
 *      of rooting blindly through the entire array.
957
 
 */
958
 
#define PID_2_ARRAY(pid) (((int) pid ) & (NUM_FORKERS - 1))
959
981
 
960
982
/*
961
983
 *      Thread wrapper for fork().
962
984
 */
963
985
pid_t rad_fork(int exec_wait)
964
986
{
965
 
        sigset_t set;
966
987
        pid_t child_pid;
967
988
 
968
 
        /*
969
 
         *      The thread is NOT interested in waiting for the exit
970
 
         *      status of the child process, so we don't bother
971
 
         *      updating our kludgy array.
972
 
         *
973
 
         *      Or, there no NO threads, so we can just do the fork
974
 
         *      thing.
975
 
         */
976
 
        if (!exec_wait || !exec_initialized) {
977
 
                return fork();
 
989
        if (exec_wait) return fork();
 
990
 
 
991
        /*
 
992
         *      Ensure that children are reaped always.
 
993
         */
 
994
        reap_children();
 
995
 
 
996
        /*
 
997
         *      Lock the mutex.
 
998
         */
 
999
        pthread_mutex_lock(&thread_pool.wait_mutex);
 
1000
 
 
1001
        /*
 
1002
         *      No room to save the PID: die.
 
1003
         */
 
1004
        if (((thread_pool.wait_tail + 1) % MAX_WAITERS)
 
1005
            == thread_pool.wait_head) {
 
1006
                rad_assert(0 == 1);
978
1007
        }
979
1008
 
980
1009
        /*
981
 
         *      Block SIGCLHD until such time as we've saved the PID.
982
 
         *
983
 
         *      Note that we block SIGCHLD for ALL threads associated
984
 
         *      with this process!  This is to prevent race conditions!
985
 
         */
986
 
        sigemptyset(&set);
987
 
        sigaddset(&set, SIGCHLD);
988
 
        sigprocmask(SIG_BLOCK, &set, NULL);
989
 
 
990
 
        /*
991
 
         *      Do the fork.
 
1010
         *      Fork & save the PID for later reaping.
992
1011
         */
993
1012
        child_pid = fork();
994
 
 
995
 
        /*
996
 
         *      We managed to fork.  Let's see if we have a free
997
 
         *      array entry.
998
 
         */
999
 
        if (child_pid > 0) { /* parent */
1000
 
                int i;
1001
 
                int found;
1002
 
                time_t now = time(NULL);
1003
 
 
1004
 
                /*
1005
 
                 *      We store the information in the array
1006
 
                 *      indexed by PID.  This means that we have
1007
 
                 *      on average an O(1) lookup to find the element,
1008
 
                 *      instead of rooting through the entire array.
1009
 
                 */
1010
 
                i = PID_2_ARRAY(child_pid);
1011
 
                found = -1;
1012
 
 
1013
 
                /*
1014
 
                 *      We may have multiple threads trying to find an
1015
 
                 *      empty position, so we lock the array until
1016
 
                 *      we've found an entry.
1017
 
                 */
1018
 
                pthread_mutex_lock(&fork_mutex);
1019
 
                do {
1020
 
                        if (forkers[i].thread_id == NO_SUCH_CHILD_PID) {
1021
 
                                found = i;
1022
 
                                break;
1023
 
                        }
1024
 
 
1025
 
                        /*
1026
 
                         *      Clean up any stale forked sessions.
1027
 
                         *
1028
 
                         *      This sometimes happens, for crazy reasons.
1029
 
                         */
1030
 
                        if ((now - forkers[i].time_forked) > 30) {
1031
 
                                forkers[i].thread_id = NO_SUCH_CHILD_PID;
1032
 
 
1033
 
                                /*
1034
 
                                 *      Grab the child's exit condition,
1035
 
                                 *      just in case...
1036
 
                                 */
1037
 
                                waitpid(forkers[i].child_pid,
1038
 
                                        &forkers[i].status,
1039
 
                                        WNOHANG);
1040
 
                                sem_destroy(&forkers[i].child_done);
1041
 
                                found = i;
1042
 
                                break;
1043
 
                        }
1044
 
 
1045
 
                        /*
1046
 
                         *  Increment it, within the array.
1047
 
                         */
1048
 
                        i++;
1049
 
                        i &= (NUM_FORKERS - 1);
1050
 
                } while (i != PID_2_ARRAY(child_pid));
1051
 
 
1052
 
                /*
1053
 
                 *      Arg.  We did a fork, and there was nowhere to
1054
 
                 *      put the answer.
1055
 
                 */
1056
 
                if (found < 0) {
1057
 
                        sigprocmask(SIG_UNBLOCK, &set, NULL);
1058
 
                        pthread_mutex_unlock(&fork_mutex);
1059
 
                        return (pid_t) -1;
1060
 
                }
1061
 
 
1062
 
                /*
1063
 
                 *      In the parent, set the status, and create the
1064
 
                 *      semaphore.
1065
 
                 */
1066
 
                forkers[found].status = -1;
1067
 
                forkers[found].child_pid = child_pid;
1068
 
                forkers[found].thread_id = pthread_self();
1069
 
                forkers[found].time_forked = now;
1070
 
                sem_init(&forkers[found].child_done, 0, SEMAPHORE_LOCKED);
1071
 
                pthread_mutex_unlock(&fork_mutex);
 
1013
        if (child_pid != 0) {
 
1014
                thread_pool.wait[thread_pool.wait_tail] = child_pid;
 
1015
                thread_pool.wait_tail++;
 
1016
                thread_pool.wait_tail %= MAX_WAITERS;
 
1017
 
 
1018
                /*
 
1019
                 *      Unlock the mutex.
 
1020
                 */
 
1021
                pthread_mutex_unlock(&thread_pool.wait_mutex);
1072
1022
        }
1073
1023
 
1074
1024
        /*
1075
 
         *      Unblock SIGCHLD, now that there's no chance of bad entries
1076
 
         *      in the array.
1077
 
         */
1078
 
        sigprocmask(SIG_UNBLOCK, &set, NULL);
1079
 
 
1080
 
        /*
1081
1025
         *      Return whatever we were told.
1082
1026
         */
1083
1027
        return child_pid;
1084
1028
}
1085
1029
 
 
1030
 
1086
1031
/*
1087
 
 *      Thread wrapper for waitpid(), so threads can wait for
1088
 
 *      the PID they forked.
 
1032
 *      We may not need this any more...
1089
1033
 */
1090
1034
pid_t rad_waitpid(pid_t pid, int *status, int options)
1091
1035
{
1092
 
        int i, rcode;
1093
 
        int found;
1094
 
        pthread_t self = pthread_self();
1095
 
 
1096
 
        /*
1097
 
         *      We're only allowed to wait for a SPECIFIC pid.
1098
 
         */
1099
 
        if (pid <= 0) {
1100
 
                return -1;
1101
 
        }
1102
 
 
1103
 
        /*
1104
 
         *      Find the PID to wait for, starting at an index within
1105
 
         *      the array.  This makes the lookups O(1) on average,
1106
 
         *      instead of O(n), when the array is filling up.
1107
 
         */
1108
 
        found = -1;
1109
 
        i = PID_2_ARRAY(pid);
1110
 
        do {
1111
 
                /*
1112
 
                 *      We were the ones who forked this specific
1113
 
                 *      child.
1114
 
                 */
1115
 
                if ((forkers[i].thread_id == self) &&
1116
 
                    (forkers[i].child_pid == pid)) {
1117
 
                        found = i;
1118
 
                        break;
1119
 
                }
1120
 
 
1121
 
                i++;
1122
 
                i &= (NUM_FORKERS - 1);
1123
 
        } while (i != PID_2_ARRAY(pid));
1124
 
 
1125
 
        /*
1126
 
         *      No thread ID found: we're trying to wait for a child
1127
 
         *      we've never forked!
1128
 
         */
1129
 
        if (found < 0) {
1130
 
                return -1;
1131
 
        }
1132
 
 
1133
 
        /*
1134
 
         *      Wait for the signal that the child's status has been
1135
 
         *      returned.
1136
 
         */
1137
 
        if (options == WNOHANG) {
1138
 
                rcode = sem_trywait(&forkers[found].child_done);
1139
 
                if (rcode != 0) {
1140
 
                        return 0; /* no child available */
1141
 
                }
1142
 
        } else {                /* wait forever */
1143
 
        re_wait:
1144
 
                rcode = sem_wait(&forkers[found].child_done);
1145
 
                if ((rcode != 0) && (errno == EINTR)) {
1146
 
                        goto re_wait;
1147
 
                }
1148
 
        }
1149
 
 
1150
 
        /*
1151
 
         *      We've got the semaphore.  Now destroy it.
1152
 
         *
1153
 
         *      FIXME: Maybe we want to set up the semaphores in advance,
1154
 
         *      to prevent the creation && deletion of lots of them,
1155
 
         *      if creating and deleting them is expensive.
1156
 
         */
1157
 
        sem_destroy(&forkers[found].child_done);
1158
 
 
1159
 
        /*
1160
 
         *      Save the status BEFORE we re-set the thread ID.
1161
 
         */
1162
 
        *status = forkers[found].status;
1163
 
 
1164
 
        /*
1165
 
         *      This next line taints the other array entries,
1166
 
         *      due to other threads re-using the data structure.
1167
 
         */
1168
 
        forkers[found].thread_id = NO_SUCH_CHILD_PID;
1169
 
 
1170
 
        return pid;
1171
 
}
1172
 
 
1173
 
/*
1174
 
 *      Called by the main signal handler, to save the status of the child
1175
 
 */
1176
 
int rad_savepid(pid_t pid, int status)
1177
 
{
1178
 
        int i;
1179
 
 
1180
 
        /*
1181
 
         *      Find the PID to wait for, starting at an index within
1182
 
         *      the array.  This makes the lookups O(1) on average,
1183
 
         *      instead of O(n), when the array is filling up.
1184
 
         */
1185
 
        i = PID_2_ARRAY(pid);
1186
 
 
1187
 
        /*
1188
 
         *      Do NOT lock the array, as nothing else sets the
1189
 
         *      status and posts the semaphore.
1190
 
         */
1191
 
        do {
1192
 
                /*
1193
 
                 *      Any thread can get the sigchild...
1194
 
                 */
1195
 
                if ((forkers[i].thread_id != NO_SUCH_CHILD_PID) &&
1196
 
                    (forkers[i].child_pid == pid)) {
1197
 
                        /*
1198
 
                         *      Save the status, THEN post the
1199
 
                         *      semaphore.
1200
 
                         */
1201
 
                        forkers[i].status = status;
1202
 
                        sem_post(&forkers[i].child_done);
1203
 
 
1204
 
                        /*
1205
 
                         *      FIXME: If the child is more than 60
1206
 
                         *      seconds out of date, then delete it.
1207
 
                         *
1208
 
                         *      That is, we've forked, and the forker
1209
 
                         *      is waiting nearly forever
1210
 
                         */
1211
 
                        return 0;
1212
 
                }
1213
 
 
1214
 
                i++;
1215
 
                i &= (NUM_FORKERS - 1);
1216
 
        } while (i != PID_2_ARRAY(pid));
1217
 
 
1218
 
        return -1;
1219
 
}
 
1036
        reap_children();        /* be nice to non-wait thingies */
 
1037
        return waitpid(pid, status, options);
 
1038
        
 
1039
}
 
1040
 
1220
1041
#endif