3
$Id: bchdrn.c,v 1.10 2000/12/05 21:23:42 cph Exp $
5
Copyright (c) 1991-2000 Massachusetts Institute of Technology
7
This program is free software; you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation; either version 2 of the License, or (at
10
your option) any later version.
12
This program is distributed in the hope that it will be useful, but
13
WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
General Public License for more details.
17
You should have received a copy of the GNU General Public License
18
along with this program; if not, write to the Free Software
19
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22
/* Drone program for overlapped I/O in bchscheme. */
27
/* ux.h includes <setjmp.h> indirectly */
35
extern char * EXFUN (error_name, (int));
36
extern int EXFUN (retrying_file_operation,
37
(ssize_t EXFUN ((*), (int, char *, int)),
38
int, char *, long, long, char *, char *, long *,
39
int EXFUN ((*), (char *, char *))));
41
#ifdef USE_SYSV_SHARED_MEMORY
46
char * file_name; /* gc file name */
47
int shmid; /* shared memory descriptor */
48
int tdron; /* total number of drones */
49
int nbuf; /* total number of buffers */
50
long bufsiz; /* size of each buffer in bytes */
51
int sdron; /* index of first drone to start */
52
int ndron; /* number of drones to start */
53
int keep_p; /* keep the gc file if Scheme dies? */
63
static char string_a[] = "%s";
64
#define STRING_FMT &string_a[0]
66
static char decimal_int_a[] = "%d";
67
#define DECIMAL_INT_FMT &decimal_int_a[0]
69
static char decimal_long_a[] = "%ld";
70
#define DECIMAL_LONG_FMT &decimal_long_a[0]
72
static struct argdesc command_line[] =
74
{ "program_name", STRING_FMT, &arguments.program_name },
75
{ "file_name", STRING_FMT, &arguments.file_name },
76
{ "shmid", DECIMAL_INT_FMT, &arguments.shmid },
77
{ "tdron", DECIMAL_INT_FMT, &arguments.tdron },
78
{ "nbuf", DECIMAL_INT_FMT, &arguments.nbuf },
79
{ "bufsiz", DECIMAL_LONG_FMT, &arguments.bufsiz },
80
{ "sdron", DECIMAL_INT_FMT, &arguments.sdron },
81
{ "ndron", DECIMAL_INT_FMT, &arguments.ndron },
82
{ "keep_gc_file", DECIMAL_INT_FMT, &arguments.keep_p }
85
static int gc_fid = -1;
86
static char * shared_memory;
87
static struct buffer_info * gc_buffers;
88
static struct drone_info * myself;
89
static unsigned long * drone_version, * wait_mask;
90
static jmp_buf abort_point;
91
static pid_t boss_pid;
93
static void EXFUN (kill_program, (int sig));
96
DEFUN (posix_signal, (signum, handler),
97
int signum AND void EXFUN ((*handler), ()))
101
new.sa_handler = handler;
102
UX_sigemptyset (&new.sa_mask);
103
UX_sigaddset ((&new.sa_mask), SIGCONT);
104
UX_sigaddset ((&new.sa_mask), SIGQUIT);
105
new.sa_flags = SA_NOCLDSTOP;
107
if ((UX_sigaction (signum, &new, 0)) == -1)
109
fprintf (stderr, "%s (%d, posix_signal): sigaction failed. errno = %s.\n",
110
arguments.program_name, myself->index, (error_name (errno)));
119
DEFUN (kill_program, (sig), int sig)
121
myself->state = drone_dead;
124
shmdt (shared_memory);
127
shmctl (arguments.shmid, IPC_RMID, ((struct shmid_ds *) 0));
128
if (!arguments.keep_p)
129
unlink (arguments.file_name);
136
DEFUN (abort_operation, (sig), int sig)
138
RE_INSTALL_HANDLER (SIGQUIT, abort_operation);
139
myself->state = drone_aborting;
140
longjmp (abort_point, 1);
145
DEFUN (continue_running, (sig), int sig)
147
RE_INSTALL_HANDLER (SIGCONT, continue_running);
148
longjmp (abort_point, 1);
153
DEFUN (always_one, (operation_name, noise),
154
char * operation_name AND char * noise)
160
DEFUN (process_requests, (drone), struct drone_info * drone)
162
sigset_t non_blocking_signal_mask, blocking_signal_mask;
163
int result, count, buffer_index, flags;
164
long current_position = -1;
165
struct timeval timeout;
166
struct stat file_info;
167
unsigned long read_mask, my_mask;
170
my_mask = (((unsigned long) 1) << drone->index);
171
drone->DRONE_PID = (getpid ());
172
gc_fid = (open (arguments.file_name, O_RDWR, 0644));
176
"%s (%d, process_requests): open failed. errno = %s.\n",
177
arguments.program_name, drone->index, (error_name (errno)));
179
if (drone->DRONE_PPID == boss_pid)
180
(void) (kill (boss_pid, SIGCONT));
185
printf ("%s (%d, process_requests): Starting (pid = %d, ppid = %d).\n",
186
arguments.program_name, drone->index,
187
drone->DRONE_PID, drone->DRONE_PPID);
190
if ((result = (fstat (gc_fid, &file_info))) == -1)
193
"%s (%d, process_requests): fstat failed. errno = %s.\n",
194
arguments.program_name, drone->index, (error_name (errno)));
197
/* Force O_SYNC only if we are dealing with a raw device. */
199
if ((result == -1) || ((file_info.st_mode & S_IFMT) == S_IFCHR))
201
if ((flags = (fcntl (gc_fid, F_GETFL, 0))) == -1)
205
"%s (%d, process_requests): fcntl (F_GETFL) failed. errno = %s.\n",
206
arguments.program_name, drone->index, (error_name (errno)));
212
if ((fcntl (gc_fid, F_SETFL, flags)) == -1)
216
"%s (%d, process_requests): fcntl (F_SETFL) failed. errno = %s.\n",
217
arguments.program_name, drone->index, (error_name (errno)));
223
UX_sigemptyset (&non_blocking_signal_mask);
224
UX_sigemptyset (&blocking_signal_mask);
225
UX_sigaddset ((&blocking_signal_mask), SIGCONT);
226
UX_sigaddset ((&blocking_signal_mask), SIGQUIT);
227
UX_sigprocmask (SIG_SETMASK, (&blocking_signal_mask), 0);
228
posix_signal (SIGQUIT, abort_operation);
229
posix_signal (SIGCONT, continue_running);
231
if ((setjmp (abort_point)) == 0)
233
count = drone->index;
234
drone->state = drone_idle;
235
if (drone->DRONE_PPID == boss_pid)
236
(void) (kill (boss_pid, SIGCONT));
246
UX_sigprocmask (SIG_SETMASK, (&non_blocking_signal_mask), 0);
247
result = (select (0, 0, 0, 0, &timeout));
248
UX_sigprocmask (SIG_SETMASK, (&blocking_signal_mask), 0);
250
if ((drone->state != drone_idle)
251
|| ((result == -1) && (errno == EINTR)))
256
"\n%s (%d, process_requests): request after timeout %d.\n",
257
arguments.program_name, drone->index, drone->state);
261
switch (drone->state)
265
"\n%s (%d, process_requests): Unknown/bad operation %d.\n",
266
arguments.program_name, drone->index, drone->state);
276
printf ("\n%s (%d, process_requests): Aborting.",
277
arguments.program_name, drone->index);
280
drone->buffer_index = -1;
281
current_position = -1;
287
/* Can't use buffer->bottom because the shared memory may be
288
attached at a different address!
292
enum drone_state operation;
293
char * operation_name, * buffer_address;
294
struct buffer_info * buffer;
295
struct gc_queue_entry * entry;
297
operation = drone->state;
298
buffer_index = (drone->buffer_index);
299
buffer = (gc_buffers + buffer_index);
301
entry = ((struct gc_queue_entry *)
302
(((char *) drone) + (drone->entry_offset)));
303
entry->error_code = 0;
305
operation_name = ((operation == drone_reading) ? "read" : "write");
306
buffer_address = (shared_memory + (arguments.bufsiz * buffer_index));
308
printf ("\n%s (%d, process_requests %s): Buffer index = %d.\n",
309
arguments.program_name, drone->index, operation_name,
311
printf ("\tBuffer address = 0x%lx; Position = 0x%lx; Size = 0x%lx.",
312
buffer_address, buffer->position, buffer->size);
316
UX_sigprocmask (SIG_SETMASK, (&non_blocking_signal_mask), 0);
317
result = (retrying_file_operation
318
(((operation == drone_reading) ? read : write),
319
gc_fid, buffer_address,
320
buffer->position, buffer->size, operation_name, NULL,
321
¤t_position, always_one));
323
UX_sigprocmask (SIG_SETMASK, (&blocking_signal_mask), 0);
327
buffer->state = ((operation == drone_reading)
329
: buffer_write_error);
330
drone->buffer_index = -1;
331
entry->drone_index = -1;
332
entry->error_code = saved_errno;
333
entry->state = entry_error;
334
current_position = -1;
336
printf ("\n%s (%d, process_requests): %s error (errno = %s).\n",
337
arguments.program_name, drone->index, operation_name,
338
(error_name (saved_errno)));
345
buffer->state = ((operation == drone_reading)
348
drone->buffer_index = -1;
349
entry->drone_index = -1;
350
if (operation == drone_writing)
352
entry->retry_count = 0;
353
entry->state = entry_idle;
357
printf ("\n%s (%d, process_requests %s): Done.",
358
arguments.program_name, drone->index, operation_name);
366
drone->state = drone_idle;
367
read_mask = (* wait_mask);
368
if ((read_mask & my_mask) == my_mask)
369
(void) (kill (boss_pid, SIGCONT));
371
else if (result == 0)
373
if (count == arguments.tdron)
376
if ((kill (boss_pid, 0)) == -1)
379
read_mask = (* wait_mask);
380
if ((read_mask & my_mask) == my_mask)
383
"\n%s (%d, process_requests): signal deadlock (%s)!\n",
384
arguments.program_name, drone->index,
385
((read_mask == ((unsigned long) -1)) ? "any" : "me"));
387
drone->state = drone_idle; /* !! */
388
(void) (kill (boss_pid, SIGCONT));
395
DEFUN_VOID (start_drones)
399
struct drone_info *gc_drones, *drone;
401
my_pid = (getpid ());
403
shared_memory = (shmat (arguments.shmid, ((char *) 0), 0));
404
if (shared_memory == ((char *) -1))
408
%s (start_drones): Unable to attach shared memory segment %d (errno = %s).\n",
409
arguments.program_name, arguments.shmid, (error_name (errno)));
412
kill (boss_pid, SIGCONT);
416
printf ("%s (start_drones): Attached shared memory at address = 0x%lx.\n",
417
arguments.program_name, ((long) shared_memory));
420
posix_signal (SIGINT, SIG_IGN);
421
posix_signal (SIGQUIT, SIG_IGN);
422
posix_signal (SIGHUP, kill_program);
423
posix_signal (SIGTERM, kill_program);
425
gc_buffers = ((struct buffer_info *)
426
(shared_memory + (arguments.nbuf * arguments.bufsiz)));
427
gc_drones = ((struct drone_info *) (gc_buffers + arguments.nbuf));
428
drone_version = ((unsigned long *) (gc_drones + arguments.tdron));
429
wait_mask = (drone_version + 1);
430
if ((* drone_version) != ((unsigned long) DRONE_VERSION_NUMBER))
433
"%s (start_drones): stored drone version != drone version.\n",
434
arguments.program_name);
435
fprintf (stderr, "\t*drone_version = %ld; DRONE_VERSION_NUMBER = %ld.\n",
436
(* drone_version), ((unsigned long) DRONE_VERSION_NUMBER));
438
kill (boss_pid, SIGCONT);
442
for (counter = 1, drone = (gc_drones + (arguments.sdron + 1));
443
counter < arguments.ndron;
446
if ((cpid = (fork ())) == 0)
448
drone->DRONE_PPID = my_pid;
449
process_requests (drone);
455
"%s (start_drones): fork failed; errno = %s.\n",
456
arguments.program_name, (error_name (errno)));
460
drone = (gc_drones + arguments.sdron);
461
drone->DRONE_PPID = boss_pid;
462
/* This is non-portable behavior to prevent zombies from being created. */
463
if (arguments.ndron != 1)
464
posix_signal (SIGCHLD, SIG_IGN);
465
process_requests (drone);
470
DEFUN (main, (argc, argv), int argc AND char ** argv)
473
static char err_buf[1024];
474
#if defined(DEBUG) || defined(DEBUG_1) || defined(DEBUG_2)
475
static char out_buf[1024];
477
setvbuf (stdout, &out_buf[0], _IOFBF, (sizeof (out_buf)));
479
setvbuf (stderr, &err_buf[0], _IOFBF, (sizeof (err_buf)));
482
printf ("%s (main): Arguments =\n", argv[0]);
483
for (count = 1; count < argc; count++)
484
printf ("\t%s\n", argv[count]);
488
nargs = ((sizeof (command_line)) / (sizeof (struct argdesc)));
489
boss_pid = (getppid ());
493
"%s (main): Wrong number of arguments (got %d, expected %d).\n",
494
argv[0], (argc - 1), (nargs - 1));
496
kill (boss_pid, SIGCONT);
499
for (count = 0; count < nargs; count++)
501
if (command_line[count].format == STRING_FMT)
502
(* ((char **) command_line[count].location)) = argv[count];
505
command_line[count].format,
506
command_line[count].location);
510
printf ("%s (main): Parsed arguments =\n", argv[0]);
511
for (count = 0; count < nargs; count++)
513
if (command_line[count].format == STRING_FMT)
514
printf ("\t%s\t= %s\n",
515
command_line[count].name,
516
(* ((char **) (command_line[count].location))));
518
printf ("\t%s\t= %d\n",
519
command_line[count].name,
520
(* ((int *) (command_line[count].location))));
532
#endif /* USE_SYSV_SHARED_MEMORY */
537
DEFUN (main, (argc, argv), int argc AND char ** argv)
539
fprintf (stderr, "%s: Not implemented.\n", (argv[0]));