1
/******************************************************
2
The interface to the operating system file i/o primitives
6
Created 10/21/1995 Heikki Tuuri
7
*******************************************************/
15
/* We assume in this case that the OS has standard Posix aio (at least SunOS
16
2.6, HP-UX 11i and AIX 4.3 have) */
18
#undef __USE_FILE_OFFSET64
23
/* We use these mutexes to protect lseek + file i/o operation, if the
24
OS does not provide an atomic pread or pwrite, or similar */
25
#define OS_FILE_N_SEEK_MUTEXES 16
26
os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
28
/* In simulated aio, merge at most this many consecutive i/os */
29
#define OS_AIO_MERGE_N_CONSECUTIVE 32
31
/* If this flag is TRUE, then we will use the native aio of the
32
OS (provided we compiled Innobase with it in), otherwise we will
33
use simulated aio we build below with threads */
35
ibool os_aio_use_native_aio = FALSE;
37
/* The aio array slot structure */
38
typedef struct os_aio_slot_struct os_aio_slot_t;
40
struct os_aio_slot_struct{
41
ibool is_read; /* TRUE if a read operation */
42
ulint pos; /* index of the slot in the aio
44
ibool reserved; /* TRUE if this slot is reserved */
45
ulint len; /* length of the block to read or
47
byte* buf; /* buffer used in i/o */
48
ulint type; /* OS_FILE_READ or OS_FILE_WRITE */
49
ulint offset; /* 32 low bits of file offset in
51
ulint offset_high; /* 32 high bits of file offset */
52
os_file_t file; /* file where to read or write */
53
char* name; /* file name or path */
54
ibool io_already_done;/* used only in simulated aio:
55
TRUE if the physical i/o already
56
made and only the slot message
57
needs to be passed to the caller
58
of os_aio_simulated_handle */
59
void* message1; /* message which is given by the */
60
void* message2; /* the requester of an aio operation
61
and which can be used to identify
62
which pending aio operation was
65
OVERLAPPED control; /* Windows control block for the
67
#elif defined(POSIX_ASYNC_IO)
68
struct aiocb control; /* Posix control block for aio
73
/* The aio array structure */
74
typedef struct os_aio_array_struct os_aio_array_t;
76
struct os_aio_array_struct{
77
os_mutex_t mutex; /* the mutex protecting the aio array */
78
os_event_t not_full; /* The event which is set to signaled
79
state when there is space in the aio
80
outside the ibuf segment */
81
ulint n_slots; /* Total number of slots in the aio array.
82
This must be divisible by n_threads. */
83
ulint n_segments;/* Number of segments in the aio array of
84
pending aio requests. A thread can wait
85
separately for any one of the segments. */
86
ulint n_reserved;/* Number of reserved slots in the
87
aio array outside the ibuf segment */
88
os_aio_slot_t* slots; /* Pointer to the slots in the array */
89
os_event_t* events; /* Pointer to an array of event handles
90
where we copied the handles from slots,
91
in the same order. This can be used in
92
WaitForMultipleObjects; used only in
96
/* Array of events used in simulated aio */
97
os_event_t* os_aio_segment_wait_events = NULL;
99
/* The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These
100
are NULL when the module has not yet been initialized. */
101
os_aio_array_t* os_aio_read_array = NULL;
102
os_aio_array_t* os_aio_write_array = NULL;
103
os_aio_array_t* os_aio_ibuf_array = NULL;
104
os_aio_array_t* os_aio_log_array = NULL;
105
os_aio_array_t* os_aio_sync_array = NULL;
107
ulint os_aio_n_segments = ULINT_UNDEFINED;
109
/***************************************************************************
110
Retrieves the last error number if an error occurs in a file io function.
111
The number should be retrieved before any other OS calls (because they may
112
overwrite the error number). If the number is not known to this program,
113
the OS error number + 100 is returned. */
116
os_file_get_last_error(void)
117
/*========================*/
118
/* out: error number, or OS error number + 100 */
124
err = (ulint) GetLastError();
126
if (err == ERROR_FILE_NOT_FOUND) {
127
return(OS_FILE_NOT_FOUND);
128
} else if (err == ERROR_DISK_FULL) {
129
return(OS_FILE_DISK_FULL);
130
} else if (err == ERROR_FILE_EXISTS) {
131
return(OS_FILE_ALREADY_EXISTS);
138
if (err == ENOSPC ) {
139
return(OS_FILE_DISK_FULL);
140
#ifdef POSIX_ASYNC_IO
141
} else if (err == EAGAIN) {
142
return(OS_FILE_AIO_RESOURCES_RESERVED);
144
} else if (err == ENOENT) {
145
return(OS_FILE_NOT_FOUND);
146
} else if (err == EEXIST) {
147
return(OS_FILE_ALREADY_EXISTS);
154
/********************************************************************
155
Does error handling when a file operation fails. If we have run out
156
of disk space, then the user can clean the disk. If we do not find
157
a specified file, then the user can copy it to disk. */
160
os_file_handle_error(
161
/*=================*/
162
/* out: TRUE if we should retry the operation */
163
os_file_t file, /* in: file pointer */
164
char* name) /* in: name of a file or NULL */
169
err = os_file_get_last_error();
171
if (err == OS_FILE_DISK_FULL) {
176
"Innobase encountered a problem with file %s.\n",
179
printf("Disk is full. Try to clean the disk to free space\n");
180
printf("before answering the following: How to continue?\n");
181
printf("(Y == freed some space: try again)\n");
182
printf("(N == crash the database: will restart it)?\n");
183
ask_with_no_question:
184
input_char = getchar();
186
if (input_char == (int) 'N') {
190
} else if (input_char == (int) 'Y') {
193
} else if (input_char == (int) '\n') {
195
goto ask_with_no_question;
199
} else if (err == OS_FILE_AIO_RESOURCES_RESERVED) {
209
/********************************************************************
210
Opens an existing file or creates a new. */
215
/* out, own: handle to the file, not defined if error,
216
error number can be retrieved with os_get_last_error */
217
char* name, /* in: name of the file or path as a null-terminated
219
ulint create_mode, /* in: OS_FILE_OPEN if an existing file is opened
220
(if does not exist, error), or OS_FILE_CREATE if a new
221
file is created (if exists, error), OS_FILE_OVERWRITE
222
if a new is created or an old overwritten */
223
ulint purpose,/* in: OS_FILE_AIO, if asynchronous, non-buffered i/o
224
is desired, OS_FILE_NORMAL, if any normal file */
225
ibool* success)/* out: TRUE if succeed, FALSE if error */
236
if (create_mode == OS_FILE_OPEN) {
237
create_flag = OPEN_EXISTING;
238
} else if (create_mode == OS_FILE_CREATE) {
239
create_flag = CREATE_NEW;
240
} else if (create_mode == OS_FILE_OVERWRITE) {
241
create_flag = CREATE_ALWAYS;
247
if (purpose == OS_FILE_AIO) {
248
/* use asynchronous (overlapped) io and no buffering
249
of writes in the OS */
252
if (os_aio_use_native_aio) {
253
attributes = attributes | FILE_FLAG_OVERLAPPED;
256
#ifdef UNIV_NON_BUFFERED_IO
257
attributes = attributes | FILE_FLAG_NO_BUFFERING;
259
} else if (purpose == OS_FILE_NORMAL) {
261
#ifdef UNIV_NON_BUFFERED_IO
262
| FILE_FLAG_NO_BUFFERING
270
file = CreateFile(name,
271
GENERIC_READ | GENERIC_WRITE, /* read and write
273
FILE_SHARE_READ,/* file can be read by other
275
NULL, /* default security attributes */
278
NULL); /* no template file */
280
if (file == INVALID_HANDLE_VALUE) {
283
if (create_mode != OS_FILE_OPEN
284
&& os_file_get_last_error() == OS_FILE_DISK_FULL) {
286
retry = os_file_handle_error(file, name);
305
if (create_mode == OS_FILE_OPEN) {
306
create_flag = O_RDWR;
307
} else if (create_mode == OS_FILE_CREATE) {
308
create_flag = O_RDWR | O_CREAT | O_EXCL;
309
} else if (create_mode == OS_FILE_OVERWRITE) {
310
create_flag = O_RDWR | O_CREAT | O_TRUNC;
316
UT_NOT_USED(purpose);
318
if (create_mode == OS_FILE_CREATE) {
320
file = open(name, create_flag, S_IRWXU | S_IRWXG | S_IRWXO);
322
file = open(name, create_flag);
328
if (create_mode != OS_FILE_OPEN
329
&& errno == ENOSPC) {
331
retry = os_file_handle_error(file, name);
345
/***************************************************************************
346
Closes a file handle. In case of error, error number can be retrieved with
347
os_file_get_last_error. */
352
/* out: TRUE if success */
353
os_file_t file) /* in, own: handle to a file */
360
ret = CloseHandle(file);
380
/***************************************************************************
386
/* out: TRUE if success */
387
os_file_t file, /* in: handle to a file */
388
ulint* size, /* out: least significant 32 bits of file
390
ulint* size_high)/* out: most significant 32 bits of size */
396
low = GetFileSize(file, &high);
398
if ((low == 0xFFFFFFFF) && (GetLastError() != NO_ERROR)) {
407
*size = (ulint) lseek(file, 0, SEEK_END);
414
/***************************************************************************
415
Sets a file size. This function can be used to extend or truncate a file. */
420
/* out: TRUE if success */
421
char* name, /* in: name of the file or path as a
422
null-terminated string */
423
os_file_t file, /* in: handle to a file */
424
ulint size, /* in: least significant 32 bits of file
426
ulint size_high)/* in: most significant 32 bits of size */
437
buf = ut_malloc(UNIV_PAGE_SIZE * 64);
439
/* Write buffer full of zeros */
440
for (i = 0; i < UNIV_PAGE_SIZE * 64; i++) {
446
#if (UNIV_WORD_SIZE == 8)
447
low = low + (size_high << 32);
449
while (offset < low) {
450
if (low - offset < UNIV_PAGE_SIZE * 64) {
451
n_bytes = low - offset;
453
n_bytes = UNIV_PAGE_SIZE * 64;
456
ret = os_file_write(name, file, buf, offset, 0, n_bytes);
467
ret = os_file_flush(file);
474
retry = os_file_handle_error(file, name);
483
/***************************************************************************
484
Flushes the write buffers of a given file to the disk. */
489
/* out: TRUE if success */
490
os_file_t file) /* in, own: handle to a file */
497
ret = FlushFileBuffers(file);
519
/***********************************************************************
520
Does a synchronous read operation in Posix. */
525
/* out: number of bytes read, -1 if error */
526
os_file_t file, /* in: handle to a file */
527
void* buf, /* in: buffer where to read */
528
ulint n, /* in: number of bytes to read */
529
ulint offset) /* in: offset from where to read */
532
return(pread(file, buf, n, (off_t) offset));
537
/* Protect the seek / read operation with a mutex */
538
i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
540
os_mutex_enter(os_file_seek_mutexes[i]);
542
ret = lseek(file, (off_t) offset, 0);
545
os_mutex_exit(os_file_seek_mutexes[i]);
550
ret = read(file, buf, n);
552
os_mutex_exit(os_file_seek_mutexes[i]);
558
/***********************************************************************
559
Does a synchronous write operation in Posix. */
564
/* out: number of bytes written, -1 if error */
565
os_file_t file, /* in: handle to a file */
566
void* buf, /* in: buffer from where to write */
567
ulint n, /* in: number of bytes to write */
568
ulint offset) /* in: offset where to write */
571
return(pwrite(file, buf, n, (off_t) offset));
576
/* Protect the seek / write operation with a mutex */
577
i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
579
os_mutex_enter(os_file_seek_mutexes[i]);
581
ret = lseek(file, (off_t) offset, 0);
584
os_mutex_exit(os_file_seek_mutexes[i]);
589
ret = write(file, buf, n);
591
os_mutex_exit(os_file_seek_mutexes[i]);
598
/***********************************************************************
599
Requests a synchronous positioned read operation. */
604
/* out: TRUE if request was
605
successful, FALSE if fail */
606
os_file_t file, /* in: handle to a file */
607
void* buf, /* in: buffer where to read */
608
ulint offset, /* in: least significant 32 bits of file
609
offset where to read */
610
ulint offset_high, /* in: most significant 32 bits of
612
ulint n) /* in: number of bytes to read */
632
/* Protect the seek / read operation with a mutex */
633
i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
635
os_mutex_enter(os_file_seek_mutexes[i]);
637
ret2 = SetFilePointer(file, low, &high, FILE_BEGIN);
639
if (ret2 == 0xFFFFFFFF && GetLastError() != NO_ERROR) {
640
err = GetLastError();
642
os_mutex_exit(os_file_seek_mutexes[i]);
647
ret = ReadFile(file, buf, n, &len, NULL);
649
os_mutex_exit(os_file_seek_mutexes[i]);
651
if (ret && len == n) {
655
err = GetLastError();
661
#if (UNIV_WORD_SIZE == 8)
662
offset = offset + (offset_high << 32);
665
/* Protect the seek / read operation with a mutex */
666
i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
668
os_mutex_enter(os_file_seek_mutexes[i]);
670
ret = os_file_pread(file, buf, n, (off_t) offset);
673
os_mutex_exit(os_file_seek_mutexes[i]);
679
retry = os_file_handle_error(file, NULL);
690
/***********************************************************************
691
Requests a synchronous write operation. */
696
/* out: TRUE if request was
697
successful, FALSE if fail */
698
char* name, /* in: name of the file or path as a
699
null-terminated string */
700
os_file_t file, /* in: handle to a file */
701
void* buf, /* in: buffer from which to write */
702
ulint offset, /* in: least significant 32 bits of file
703
offset where to write */
704
ulint offset_high, /* in: most significant 32 bits of
706
ulint n) /* in: number of bytes to write */
726
/* Protect the seek / write operation with a mutex */
727
i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
729
os_mutex_enter(os_file_seek_mutexes[i]);
731
ret2 = SetFilePointer(file, low, &high, FILE_BEGIN);
733
if (ret2 == 0xFFFFFFFF && GetLastError() != NO_ERROR) {
734
err = GetLastError();
736
os_mutex_exit(os_file_seek_mutexes[i]);
741
ret = WriteFile(file, buf, n, &len, NULL);
743
os_mutex_exit(os_file_seek_mutexes[i]);
745
if (ret && len == n) {
752
#if (UNIV_WORD_SIZE == 8)
753
offset = offset + (offset_high << 32);
756
ret = pwrite(file, buf, n, (off_t) offset);
764
retry = os_file_handle_error(file, name);
775
/********************************************************************
776
Returns a pointer to the nth slot in the aio array. */
779
os_aio_array_get_nth_slot(
780
/*======================*/
781
/* out: pointer to slot */
782
os_aio_array_t* array, /* in: aio array */
783
ulint index) /* in: index of the slot */
785
ut_a(index < array->n_slots);
787
return((array->slots) + index);
790
/****************************************************************************
791
Creates an aio wait array. */
796
/* out, own: aio array */
797
ulint n, /* in: maximum number of pending aio operations
798
allowed; n must be divisible by n_segments */
799
ulint n_segments) /* in: number of segments in the aio array */
801
os_aio_array_t* array;
808
ut_a(n_segments > 0);
809
ut_a(n % n_segments == 0);
811
array = ut_malloc(sizeof(os_aio_array_t));
813
array->mutex = os_mutex_create(NULL);
814
array->not_full = os_event_create(NULL);
816
array->n_segments = n_segments;
817
array->n_reserved = 0;
818
array->slots = ut_malloc(n * sizeof(os_aio_slot_t));
819
array->events = ut_malloc(n * sizeof(os_event_t));
821
for (i = 0; i < n; i++) {
822
slot = os_aio_array_get_nth_slot(array, i);
825
slot->reserved = FALSE;
827
over = &(slot->control);
829
over->hEvent = os_event_create(NULL);
831
*((array->events) + i) = over->hEvent;
838
/****************************************************************************
839
Initializes the asynchronous io system. Creates separate aio array for
840
non-ibuf read and write, a third aio array for the ibuf i/o, with just one
841
segment, two aio arrays for log reads and writes with one segment, and a
842
synchronous aio array of the specified size. The combined number of segments
843
in the three first aio arrays is the parameter n_segments given to the
844
function. The caller must create an i/o handler thread for each segment in
845
the four first arrays, but not for the sync aio array. */
850
ulint n, /* in: maximum number of pending aio operations
851
allowed; n must be divisible by n_segments */
852
ulint n_segments, /* in: combined number of segments in the four
853
first aio arrays; must be >= 4 */
854
ulint n_slots_sync) /* in: number of slots in the sync aio array */
860
#ifdef POSIX_ASYNC_IO
863
ut_ad(n % n_segments == 0);
864
ut_ad(n_segments >= 4);
866
n_per_seg = n / n_segments;
867
n_write_segs = (n_segments - 2) / 2;
868
n_read_segs = n_segments - 2 - n_write_segs;
870
/* printf("Array n per seg %lu\n", n_per_seg); */
872
os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg,
874
os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg,
876
os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1);
878
os_aio_log_array = os_aio_array_create(n_per_seg, 1);
880
os_aio_sync_array = os_aio_array_create(n_slots_sync, 1);
882
os_aio_n_segments = n_segments;
886
for (i = 0; i < OS_FILE_N_SEEK_MUTEXES; i++) {
887
os_file_seek_mutexes[i] = os_mutex_create(NULL);
890
os_aio_segment_wait_events = ut_malloc(n_segments * sizeof(void*));
892
for (i = 0; i < n_segments; i++) {
893
os_aio_segment_wait_events[i] = os_event_create(NULL);
896
#ifdef POSIX_ASYNC_IO
897
/* Block aio signals from the current thread and its children:
898
for this to work, the current thread must be the first created
899
in the database, so that all its children will inherit its
902
sigemptyset(&sigset);
903
sigaddset(&sigset, SIGRTMIN + 1 + 0);
904
sigaddset(&sigset, SIGRTMIN + 1 + 1);
905
sigaddset(&sigset, SIGRTMIN + 1 + 2);
906
sigaddset(&sigset, SIGRTMIN + 1 + 3);
908
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
912
/**************************************************************************
913
Calculates segment number for a slot. */
916
os_aio_get_segment_no_from_slot(
917
/*============================*/
918
/* out: segment number (which is the number
919
used by, for example, i/o-handler threads) */
920
os_aio_array_t* array, /* in: aio wait array */
921
os_aio_slot_t* slot) /* in: slot in this array */
926
if (array == os_aio_ibuf_array) {
929
} else if (array == os_aio_log_array) {
932
} else if (array == os_aio_read_array) {
933
seg_len = os_aio_read_array->n_slots /
934
os_aio_read_array->n_segments;
936
segment = 2 + slot->pos / seg_len;
938
ut_a(array == os_aio_write_array);
939
seg_len = os_aio_write_array->n_slots /
940
os_aio_write_array->n_segments;
942
segment = os_aio_read_array->n_segments + 2
943
+ slot->pos / seg_len;
949
/**************************************************************************
950
Calculates local segment number and aio array from global segment number. */
953
os_aio_get_array_and_local_segment(
954
/*===============================*/
955
/* out: local segment number within
957
os_aio_array_t** array, /* out: aio wait array */
958
ulint global_segment)/* in: global segment number */
962
ut_a(global_segment < os_aio_n_segments);
964
if (global_segment == 0) {
965
*array = os_aio_ibuf_array;
968
} else if (global_segment == 1) {
969
*array = os_aio_log_array;
972
} else if (global_segment < os_aio_read_array->n_segments + 2) {
973
*array = os_aio_read_array;
975
segment = global_segment - 2;
977
*array = os_aio_write_array;
979
segment = global_segment - (os_aio_read_array->n_segments + 2);
985
/***********************************************************************
986
Gets an integer value designating a specified aio array. This is used
987
to give numbers to signals in Posix aio. */
992
os_aio_array_t* array) /* in: aio array */
994
if (array == os_aio_ibuf_array) {
998
} else if (array == os_aio_log_array) {
1002
} else if (array == os_aio_read_array) {
1005
} else if (array == os_aio_write_array) {
1015
/***********************************************************************
1016
Gets the aio array for its number. */
1019
os_aio_get_array_from_no(
1020
/*=====================*/
1021
/* out: aio array */
1022
ulint n) /* in: array number */
1025
return(os_aio_ibuf_array);
1026
} else if (n == 1) {
1028
return(os_aio_log_array);
1029
} else if (n == 2) {
1031
return(os_aio_read_array);
1032
} else if (n == 3) {
1034
return(os_aio_write_array);
1042
/***********************************************************************
1043
Requests for a slot in the aio array. If no slot is available, waits until
1044
not_full-event becomes signaled. */
1047
os_aio_array_reserve_slot(
1048
/*======================*/
1049
/* out: pointer to slot */
1050
ulint type, /* in: OS_FILE_READ or OS_FILE_WRITE */
1051
os_aio_array_t* array, /* in: aio array */
1052
void* message1,/* in: message to be passed along with
1053
the aio operation */
1054
void* message2,/* in: message to be passed along with
1055
the aio operation */
1056
os_file_t file, /* in: file handle */
1057
char* name, /* in: name of the file or path as a
1058
null-terminated string */
1059
void* buf, /* in: buffer where to read or from which
1061
ulint offset, /* in: least significant 32 bits of file
1063
ulint offset_high, /* in: most significant 32 bits of
1065
ulint len) /* in: length of the block to read or write */
1067
os_aio_slot_t* slot;
1069
OVERLAPPED* control;
1071
#elif defined(POSIX_ASYNC_IO)
1073
struct aiocb* control;
1077
os_mutex_enter(array->mutex);
1079
if (array->n_reserved == array->n_slots) {
1080
os_mutex_exit(array->mutex);
1082
if (!os_aio_use_native_aio) {
1083
/* If the handler threads are suspended, wake them
1084
so that we get more slots */
1086
os_aio_simulated_wake_handler_threads();
1089
os_event_wait(array->not_full);
1095
slot = os_aio_array_get_nth_slot(array, i);
1097
if (slot->reserved == FALSE) {
1102
array->n_reserved++;
1104
if (array->n_reserved == array->n_slots) {
1105
os_event_reset(array->not_full);
1108
slot->reserved = TRUE;
1109
slot->message1 = message1;
1110
slot->message2 = message2;
1116
slot->offset = offset;
1117
slot->offset_high = offset_high;
1118
slot->io_already_done = FALSE;
1121
control = &(slot->control);
1122
control->Offset = (DWORD)offset;
1123
control->OffsetHigh = (DWORD)offset_high;
1124
os_event_reset(control->hEvent);
1126
#elif defined(POSIX_ASYNC_IO)
1128
#if (UNIV_WORD_SIZE == 8)
1129
offset = offset + (offset_high << 32);
1131
ut_a(offset_high == 0);
1133
control = &(slot->control);
1134
control->aio_fildes = file;
1135
control->aio_buf = buf;
1136
control->aio_nbytes = len;
1137
control->aio_offset = offset;
1138
control->aio_reqprio = 0;
1139
control->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
1140
control->aio_sigevent.sigev_signo =
1141
SIGRTMIN + 1 + os_aio_get_array_no(array);
1142
/* TODO: How to choose the signal numbers? */
1144
printf("AIO signal number %lu\n", (ulint) control->aio_sigevent.sigev_signo);
1146
control->aio_sigevent.sigev_value.sival_ptr = slot;
1148
os_mutex_exit(array->mutex);
1153
/***********************************************************************
1154
Frees a slot in the aio array. */
1157
os_aio_array_free_slot(
1158
/*===================*/
1159
os_aio_array_t* array, /* in: aio array */
1160
os_aio_slot_t* slot) /* in: pointer to slot */
1165
os_mutex_enter(array->mutex);
1167
ut_ad(slot->reserved);
1169
slot->reserved = FALSE;
1171
array->n_reserved--;
1173
if (array->n_reserved == array->n_slots - 1) {
1174
os_event_set(array->not_full);
1178
os_event_reset(slot->control.hEvent);
1180
os_mutex_exit(array->mutex);
1183
/**************************************************************************
1184
Wakes up a simulated aio i/o-handler thread if it has something to do. */
1187
os_aio_simulated_wake_handler_thread(
1188
/*=================================*/
1189
ulint global_segment) /* in: the number of the segment in the aio
1192
os_aio_array_t* array;
1194
os_aio_slot_t* slot;
1198
ut_ad(!os_aio_use_native_aio);
1200
segment = os_aio_get_array_and_local_segment(&array, global_segment);
1202
n = array->n_slots / array->n_segments;
1204
/* Look through n slots after the segment * n'th slot */
1206
os_mutex_enter(array->mutex);
1208
for (i = 0; i < n; i++) {
1209
slot = os_aio_array_get_nth_slot(array, i + segment * n);
1211
if (slot->reserved) {
1212
/* Found an i/o request */
1218
os_mutex_exit(array->mutex);
1221
os_event_set(os_aio_segment_wait_events[global_segment]);
1225
/**************************************************************************
1226
Wakes up simulated aio i/o-handler threads if they have something to do. */
1229
os_aio_simulated_wake_handler_threads(void)
1230
/*=======================================*/
1234
if (os_aio_use_native_aio) {
1235
/* We do not use simulated aio: do nothing */
1240
for (i = 0; i < os_aio_n_segments; i++) {
1241
os_aio_simulated_wake_handler_thread(i);
1245
/***********************************************************************
1246
Requests an asynchronous i/o operation. */
1251
/* out: TRUE if request was queued
1252
successfully, FALSE if fail */
1253
ulint type, /* in: OS_FILE_READ or OS_FILE_WRITE */
1254
ulint mode, /* in: OS_AIO_NORMAL, ..., possibly ORed
1255
to OS_AIO_SIMULATED_WAKE_LATER: the
1256
last flag advises this function not to wake
1257
i/o-handler threads, but the caller will
1258
do the waking explicitly later, in this
1259
way the caller can post several requests in
1260
a batch; NOTE that the batch must not be
1261
so big that it exhausts the slots in aio
1262
arrays! NOTE that a simulated batch
1263
may introduce hidden chances of deadlocks,
1264
because i/os are not actually handled until
1265
all have been posted: use with great
1267
char* name, /* in: name of the file or path as a
1268
null-terminated string */
1269
os_file_t file, /* in: handle to a file */
1270
void* buf, /* in: buffer where to read or from which
1272
ulint offset, /* in: least significant 32 bits of file
1273
offset where to read or write */
1274
ulint offset_high, /* in: most significant 32 bits of
1276
ulint n, /* in: number of bytes to read or write */
1277
void* message1,/* in: messages for the aio handler (these
1278
can be used to identify a completed aio
1279
operation); if mode is OS_AIO_SYNC, these
1283
os_aio_array_t* array;
1284
os_aio_slot_t* slot;
1298
ut_ad(n % OS_FILE_LOG_BLOCK_SIZE == 0);
1299
ut_ad((ulint)buf % OS_FILE_LOG_BLOCK_SIZE == 0)
1300
ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0);
1301
ut_ad(os_aio_validate());
1303
wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
1304
mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER);
1306
if (mode == OS_AIO_SYNC
1308
&& !os_aio_use_native_aio
1311
/* This is actually an ordinary synchronous read or write:
1312
no need to use an i/o-handler thread. NOTE that if we use
1313
Windows async i/o, Windows does not allow us to use
1314
ordinary synchronous os_file_read etc. on the same file,
1315
therefore we have built a special mechanism for synchronous
1316
wait in the Windows case. */
1318
if (type == OS_FILE_READ) {
1319
return(os_file_read(file, buf, offset, offset_high, n));
1322
ut_a(type == OS_FILE_WRITE);
1324
return(os_file_write(name, file, buf, offset, offset_high, n));
1328
if (mode == OS_AIO_NORMAL) {
1329
if (type == OS_FILE_READ) {
1330
array = os_aio_read_array;
1332
array = os_aio_write_array;
1334
} else if (mode == OS_AIO_IBUF) {
1335
ut_ad(type == OS_FILE_READ);
1337
array = os_aio_ibuf_array;
1338
} else if (mode == OS_AIO_LOG) {
1340
array = os_aio_log_array;
1341
} else if (mode == OS_AIO_SYNC) {
1342
array = os_aio_sync_array;
1347
slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
1348
name, buf, offset, offset_high, n);
1349
if (type == OS_FILE_READ) {
1350
if (os_aio_use_native_aio) {
1352
ret = ReadFile(file, buf, (DWORD)n, &len,
1354
#elif defined(POSIX_ASYNC_IO)
1355
slot->control.aio_lio_opcode = LIO_READ;
1356
err = (ulint) aio_read(&(slot->control));
1357
printf("Starting Posix aio read %lu\n", err);
1361
os_aio_simulated_wake_handler_thread(
1362
os_aio_get_segment_no_from_slot(array, slot));
1365
} else if (type == OS_FILE_WRITE) {
1366
if (os_aio_use_native_aio) {
1368
ret = WriteFile(file, buf, (DWORD)n, &len,
1370
#elif defined(POSIX_ASYNC_IO)
1371
slot->control.aio_lio_opcode = LIO_WRITE;
1372
err = (ulint) aio_write(&(slot->control));
1373
printf("Starting Posix aio write %lu\n", err);
1377
os_aio_simulated_wake_handler_thread(
1378
os_aio_get_segment_no_from_slot(array, slot));
1386
if (os_aio_use_native_aio) {
1387
if ((ret && len == n)
1388
|| (!ret && GetLastError() == ERROR_IO_PENDING)) {
1390
/* aio was queued successfully! */
1392
if (mode == OS_AIO_SYNC) {
1393
/* We want a synchronous i/o operation on a file
1394
where we also use async i/o: in Windows we must
1395
use the same wait mechanism as for async i/o */
1397
return(os_aio_windows_handle(ULINT_UNDEFINED,
1399
&dummy_mess1, &dummy_mess2));
1405
goto error_handling;
1409
/* aio was queued successfully! */
1415
os_aio_array_free_slot(array, slot);
1417
retry = os_file_handle_error(file, name);
1430
/**************************************************************************
1431
This function is only used in Windows asynchronous i/o.
1432
Waits for an aio operation to complete. This function is used to wait the
1433
for completed requests. The aio array of pending requests is divided
1434
into segments. The thread specifies which segment or slot it wants to wait
1435
for. NOTE: this function will also take care of freeing the aio slot,
1436
therefore no other thread is allowed to do the freeing! */
1439
os_aio_windows_handle(
1440
/*==================*/
1441
/* out: TRUE if the aio operation succeeded */
1442
ulint segment, /* in: the number of the segment in the aio
1443
arrays to wait for; segment 0 is the ibuf
1444
i/o thread, segment 1 the log i/o thread,
1445
then follow the non-ibuf read threads, and as
1446
the last are the non-ibuf write threads; if
1447
this is ULINT_UNDEFINED, then it means that
1448
sync aio is used, and this parameter is
1450
ulint pos, /* this parameter is used only in sync aio:
1451
wait for the aio slot at this position */
1452
void** message1, /* out: the messages passed with the aio
1453
request; note that also in the case where
1454
the aio operation failed, these output
1455
parameters are valid and can be used to
1456
restart the operation, for example */
1459
os_aio_array_t* array;
1460
os_aio_slot_t* slot;
1468
if (segment == ULINT_UNDEFINED) {
1469
array = os_aio_sync_array;
1472
segment = os_aio_get_array_and_local_segment(&array, segment);
1475
/* NOTE! We only access constant fields in os_aio_array. Therefore
1476
we do not have to acquire the protecting mutex yet */
1478
ut_ad(os_aio_validate());
1479
ut_ad(segment < array->n_segments);
1481
n = array->n_slots / array->n_segments;
1483
if (array == os_aio_sync_array) {
1484
ut_ad(pos < array->n_slots);
1485
os_event_wait(array->events[pos]);
1488
i = os_event_wait_multiple(n, (array->events) + segment * n);
1491
os_mutex_enter(array->mutex);
1493
slot = os_aio_array_get_nth_slot(array, i + segment * n);
1495
ut_a(slot->reserved);
1497
ret = GetOverlappedResult(slot->file, &(slot->control), &len, TRUE);
1499
*message1 = slot->message1;
1500
*message2 = slot->message2;
1502
if (ret && len == slot->len) {
1505
err = GetLastError();
1511
os_mutex_exit(array->mutex);
1513
os_aio_array_free_slot(array, slot);
1519
#ifdef POSIX_ASYNC_IO
1521
/**************************************************************************
1522
This function is only used in Posix asynchronous i/o. Waits for an aio
1523
operation to complete. */
1526
os_aio_posix_handle(
1527
/*================*/
1528
/* out: TRUE if the aio operation succeeded */
1529
ulint array_no, /* in: array number 0 - 3 */
1530
void** message1, /* out: the messages passed with the aio
1531
request; note that also in the case where
1532
the aio operation failed, these output
1533
parameters are valid and can be used to
1534
restart the operation, for example */
1537
os_aio_array_t* array;
1538
os_aio_slot_t* slot;
1541
sigset_t proc_sigset;
1542
sigset_t thr_sigset;
1547
sigemptyset(&sigset);
1548
sigaddset(&sigset, SIGRTMIN + 1 + array_no);
1550
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
1553
sigprocmask(0, NULL, &proc_sigset);
1554
pthread_sigmask(0, NULL, &thr_sigset);
1556
for (i = 32 ; i < 40; i++) {
1557
printf("%lu : %lu %lu\n", (ulint)i,
1558
(ulint)sigismember(&proc_sigset, i),
1559
(ulint)sigismember(&thr_sigset, i));
1563
ret = sigwaitinfo(&sigset, &info);
1565
if (sig != SIGRTMIN + 1 + array_no) {
1572
printf("Handling Posix aio\n");
1574
array = os_aio_get_array_from_no(array_no);
1576
os_mutex_enter(array->mutex);
1578
slot = info.si_value.sival_ptr;
1580
ut_a(slot->reserved);
1582
*message1 = slot->message1;
1583
*message2 = slot->message2;
1585
os_mutex_exit(array->mutex);
1587
os_aio_array_free_slot(array, slot);
1593
/**************************************************************************
1594
Does simulated aio. This function should be called by an i/o-handler
1598
os_aio_simulated_handle(
1599
/*====================*/
1600
/* out: TRUE if the aio operation succeeded */
1601
ulint global_segment, /* in: the number of the segment in the aio
1602
arrays to wait for; segment 0 is the ibuf
1603
i/o thread, segment 1 the log i/o thread,
1604
then follow the non-ibuf read threads, and as
1605
the last are the non-ibuf write threads */
1606
void** message1, /* out: the messages passed with the aio
1607
request; note that also in the case where
1608
the aio operation failed, these output
1609
parameters are valid and can be used to
1610
restart the operation, for example */
1613
os_aio_array_t* array;
1615
os_aio_slot_t* slot;
1616
os_aio_slot_t* slot2;
1617
os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE];
1618
ulint n_consecutive;
1621
ulint lowest_offset;
1627
segment = os_aio_get_array_and_local_segment(&array, global_segment);
1630
/* Give other threads chance to add several i/os to the array
1635
/* NOTE! We only access constant fields in os_aio_array. Therefore
1636
we do not have to acquire the protecting mutex yet */
1638
ut_ad(os_aio_validate());
1639
ut_ad(segment < array->n_segments);
1641
n = array->n_slots / array->n_segments;
1643
/* Look through n slots after the segment * n'th slot */
1645
os_mutex_enter(array->mutex);
1647
/* Check if there is a slot for which the i/o has already been
1650
for (i = 0; i < n; i++) {
1651
slot = os_aio_array_get_nth_slot(array, i + segment * n);
1653
if (slot->reserved && slot->io_already_done) {
1663
/* Look for an i/o request at the lowest offset in the array */
1665
lowest_offset = ULINT_MAX;
1667
for (i = 0; i < n; i++) {
1668
slot = os_aio_array_get_nth_slot(array, i + segment * n);
1670
if (slot->reserved && slot->offset < lowest_offset) {
1672
/* Found an i/o request */
1673
consecutive_ios[0] = slot;
1677
lowest_offset = slot->offset;
1681
if (n_consecutive == 0) {
1683
/* No i/o requested at the moment */
1688
slot = consecutive_ios[0];
1690
/* Check if there are several consecutive blocks to read or write */
1693
for (i = 0; i < n; i++) {
1694
slot2 = os_aio_array_get_nth_slot(array, i + segment * n);
1696
if (slot2->reserved && slot2 != slot
1697
&& slot2->offset == slot->offset + slot->len
1698
&& slot->offset + slot->len > slot->offset /* check that
1699
sum does not wrap over */
1700
&& slot2->offset_high == slot->offset_high
1701
&& slot2->type == slot->type
1702
&& slot2->file == slot->file) {
1704
/* Found a consecutive i/o request */
1706
consecutive_ios[n_consecutive] = slot2;
1711
if (n_consecutive < OS_AIO_MERGE_N_CONSECUTIVE) {
1713
goto consecutive_loop;
1720
/* We have now collected n_consecutive i/o requests in the array;
1721
allocate a single buffer which can hold all data, and perform the
1725
slot = consecutive_ios[0];
1727
for (i = 0; i < n_consecutive; i++) {
1728
total_len += consecutive_ios[i]->len;
1731
if (n_consecutive == 1) {
1732
/* We can use the buffer of the i/o request */
1733
combined_buf = slot->buf;
1735
combined_buf = ut_malloc(total_len);
1740
/* We release the array mutex for the time of the i/o: NOTE that
1741
this assumes that there is just one i/o-handler thread serving
1742
a single segment of slots! */
1744
os_mutex_exit(array->mutex);
1746
if (slot->type == OS_FILE_WRITE && n_consecutive > 1) {
1747
/* Copy the buffers to the combined buffer */
1750
for (i = 0; i < n_consecutive; i++) {
1752
ut_memcpy(combined_buf + offs, consecutive_ios[i]->buf,
1753
consecutive_ios[i]->len);
1754
offs += consecutive_ios[i]->len;
1758
/* Do the i/o with ordinary, synchronous i/o functions: */
1759
if (slot->type == OS_FILE_WRITE) {
1760
ret = os_file_write(slot->name, slot->file, combined_buf,
1761
slot->offset, slot->offset_high, total_len);
1763
ret = os_file_read(slot->file, combined_buf,
1764
slot->offset, slot->offset_high, total_len);
1769
/* printf("aio: %lu consecutive %lu:th segment, first offs %lu blocks\n",
1770
n_consecutive, global_segment, slot->offset
1771
/ UNIV_PAGE_SIZE); */
1773
if (slot->type == OS_FILE_READ && n_consecutive > 1) {
1774
/* Copy the combined buffer to individual buffers */
1777
for (i = 0; i < n_consecutive; i++) {
1779
ut_memcpy(consecutive_ios[i]->buf, combined_buf + offs,
1780
consecutive_ios[i]->len);
1781
offs += consecutive_ios[i]->len;
1785
if (n_consecutive > 1) {
1786
ut_free(combined_buf);
1789
os_mutex_enter(array->mutex);
1791
/* Mark the i/os done in slots */
1793
for (i = 0; i < n_consecutive; i++) {
1794
consecutive_ios[i]->io_already_done = TRUE;
1797
/* We return the messages for the first slot now, and if there were
1798
several slots, the messages will be returned with subsequent calls
1803
ut_a(slot->reserved);
1805
*message1 = slot->message1;
1806
*message2 = slot->message2;
1808
os_mutex_exit(array->mutex);
1810
os_aio_array_free_slot(array, slot);
1815
/* We wait here until there again can be i/os in the segment
1818
os_event_reset(os_aio_segment_wait_events[global_segment]);
1820
os_mutex_exit(array->mutex);
1822
os_event_wait(os_aio_segment_wait_events[global_segment]);
1827
/**************************************************************************
1828
Validates the consistency of an aio array. */
1831
os_aio_array_validate(
1832
/*==================*/
1833
/* out: TRUE if ok */
1834
os_aio_array_t* array) /* in: aio wait array */
1836
os_aio_slot_t* slot;
1837
ulint n_reserved = 0;
1842
os_mutex_enter(array->mutex);
1844
ut_a(array->n_slots > 0);
1845
ut_a(array->n_segments > 0);
1847
for (i = 0; i < array->n_slots; i++) {
1848
slot = os_aio_array_get_nth_slot(array, i);
1850
if (slot->reserved) {
1852
ut_a(slot->len > 0);
1856
ut_a(array->n_reserved == n_reserved);
1858
os_mutex_exit(array->mutex);
1863
/**************************************************************************
1864
Validates the consistency the aio system. */
1867
os_aio_validate(void)
1868
/*=================*/
1869
/* out: TRUE if ok */
1871
os_aio_array_validate(os_aio_read_array);
1872
os_aio_array_validate(os_aio_write_array);
1873
os_aio_array_validate(os_aio_ibuf_array);
1874
os_aio_array_validate(os_aio_log_array);
1875
os_aio_array_validate(os_aio_sync_array);
1880
/**************************************************************************
1881
Prints info of the aio arrays. */
1887
os_aio_array_t* array;
1888
os_aio_slot_t* slot;
1892
array = os_aio_read_array;
1896
printf("INFO OF AN AIO ARRAY\n");
1898
os_mutex_enter(array->mutex);
1900
ut_a(array->n_slots > 0);
1901
ut_a(array->n_segments > 0);
1905
for (i = 0; i < array->n_slots; i++) {
1906
slot = os_aio_array_get_nth_slot(array, i);
1908
if (slot->reserved) {
1910
printf("Reserved slot, messages %lx %lx\n",
1911
slot->message1, slot->message2);
1912
ut_a(slot->len > 0);
1916
ut_a(array->n_reserved == n_reserved);
1918
printf("Total of %lu reserved aio slots\n", n_reserved);
1920
os_mutex_exit(array->mutex);
1922
if (array == os_aio_read_array) {
1923
array = os_aio_write_array;
1928
if (array == os_aio_write_array) {
1929
array = os_aio_ibuf_array;
1934
if (array == os_aio_ibuf_array) {
1935
array = os_aio_log_array;
1940
if (array == os_aio_log_array) {
1941
array = os_aio_sync_array;
1947
/**************************************************************************
1948
Checks that all slots in the system have been freed, that is, there are
1949
no pending io operations. */
1952
os_aio_all_slots_free(void)
1953
/*=======================*/
1954
/* out: TRUE if all free */
1956
os_aio_array_t* array;
1959
array = os_aio_read_array;
1961
os_mutex_enter(array->mutex);
1963
n_res += array->n_reserved;
1965
os_mutex_exit(array->mutex);
1967
array = os_aio_write_array;
1969
os_mutex_enter(array->mutex);
1971
n_res += array->n_reserved;
1973
os_mutex_exit(array->mutex);
1975
array = os_aio_ibuf_array;
1977
os_mutex_enter(array->mutex);
1979
n_res += array->n_reserved;
1981
os_mutex_exit(array->mutex);
1983
array = os_aio_log_array;
1985
os_mutex_enter(array->mutex);
1987
n_res += array->n_reserved;
1989
os_mutex_exit(array->mutex);
1991
array = os_aio_sync_array;
1993
os_mutex_enter(array->mutex);
1995
n_res += array->n_reserved;
1997
os_mutex_exit(array->mutex);