915
916
must return a second engine. The result of [seq_engine] is
916
917
the result of the second engine.
919
As [seq_engine] occurs frequently, there is a special operator
923
open Uq_engines.Operators
925
let eng1 = connector addr1 ues in
929
let eng2 = connector addr2 ues in
932
let eng3 = connector addr3 ues in
918
936
In these examples, we have called {!Unixqueue.run}
919
937
to start the event system. This function returns when all actions are
920
938
completed; this implies that finally all engines are synchronized again
995
1012
Another detail: It is allowed that more callbacks are
996
1013
requested when a callback function is running.
998
Note that all engine construction classes base on the
999
notification mechanism, so it is absolutely required that it
1000
is implemented error-free.
1004
1017
{3:eng_async_ch Asynchronous channels}
1019
{b Editorial note:} This section describes a feature that is now
1020
seen as outdated, and often not the optimal way of doing async I/O.
1021
Asynchronous channels are still available, though. Readers may
1006
1024
Because engines are based on Unixqueues, one can imagine
1007
1025
that complex operations on file descriptors are executed by engines.
1008
1026
Actually, there is a primitive that copies the whole byte stream
1227
1250
reports its state to the outer world. When the copy task has been
1228
1251
completed, it transitions into the state [`Done()].
1230
In the next section we present a real example that
1231
also uses the receiver class.
1255
{3:eng_uqio The I/O functions in {!Uq_io}}
1257
The functions in {!Uq_io} are patterned after the I/O functions
1258
in the standard library, only that they use the engine paradigm.
1259
For example, we have in [Pervasives]
1262
val input : in_channel -> string -> int -> int -> int
1265
for reading data from an [in_channel] and putting it into the string,
1266
and the corresponding function {!Uq_io.input_e} has the signature
1269
val input_e : [< in_device ] -> string_like -> int -> int ->
1270
int Uq_engines.engine
1273
Instead from an [in_channel] it gets data from an [in_device]. There
1274
are several kinds of {!Uq_io.in_device}, especially:
1276
- [`Polldescr(fd_style, fd, esys)] is a device reading data from
1277
the file descriptor [fd] via the event queue [esys]. The [fd_style]
1278
indicates how to read data (whether it is [Unix.read], [Unix.recv],
1279
or another system call).
1280
- [`Buffer_in b] is a device reading data from a buffer [b], and
1281
[b] is in turn connected to another device serving as data source.
1282
Buffers [b] are created with {!Uq_io.create_in_buffer}.
1284
The type [string_like] allows the values [`String s] for a string [s],
1285
and [`Memory m] for a bigarray of chars [m].
1287
There is, of course, also an {!Uq_io.out_device} for the other
1288
data flow direction. I/O functions include:
1291
- {!Uq_io.really_input_e}
1292
- {!Uq_io.input_line_e} (however, only for buffer-backed devices)
1294
- {!Uq_io.really_output_e}
1295
- {!Uq_io.output_string_e}
1296
- {!Uq_io.output_memory_e}
1297
- {!Uq_io.output_netbuffer_e}
1299
- {!Uq_io.write_eof_e} and {!Uq_io.shutdown_e} for socket shutdown and
1301
- {!Uq_io.inactivate} for an unconditional close (w/o prior flush)
1303
For example, let's develop a function reading line-by-line from a
1304
file descriptor to check whether the special line "pearl" exists:
1307
let find_pearl fd esys =
1308
let d1 = `Polldescr(Netsys.get_fd_style fd, fd, eys) in
1309
let d2 = `Buffer_in(Uq_io.create_in_buffer d1) in
1310
let found = ref false in
1313
Uq_io.input_line_e d2 ++
1315
if line = "pearl" then found := true;
1319
Uq_engines.map_engine
1320
~map_done:(fun _ -> `Done !found)
1321
~map_error:(fun err ->
1322
if err = End_of_file then `Done !found else `Error err)
1326
The result type of [find_pearl] is [bool engine].
1328
We exploit here that [input_line_e] raises [End_of_file] when the end
1329
of the input stream is reached. This exception is, of course, not directly
1330
raised, but rather the engine state [`Error End_of_file] is entered.
1331
Because of this, there is no test for the end of the recursion in [loop].
1332
The exception is caught by a [map_engine], and mapped to a regular
1236
1336
{3:eng_eg Example: A simple HTTP client}
1282
1382
Now we are interested in the moment when the connection is
1283
established. In this moment, we set up an
1284
[output_async_descr] object that copies
1285
its contents to the connection, so we can asynchronously
1286
send our HTTP request. Furthermore, we create an
1383
established. In this moment, we send the request using
1384
{!Uq_io.output_string_e}. Furthermore, we create an
1287
1385
[async_buffer] object that collects the
1288
1386
HTTP response, which can arrive at any time from now on.
1394
prerr_endline "CONNECTED"; (* debug output *)
1395
let d = `Polldescr(Netsys.get_fd_style fd, fd, ues) in
1396
Uq_io.output_string_e d "GET / HTTP/1.0\n\n" ++
1398
Uq_io.write_eof_e d ++
1400
let buffer = new async_buffer b in
1401
new receiver ~src:fd ~dst:buffer ues
1292
~is_done:(fun connstat ->
1295
prerr_endline "CONNECTED";
1296
let printer = new output_async_descr ~dst:fd ues in
1297
let buffer = new async_buffer b in
1298
let receiver = new receiver ~src:fd ~dst:buffer ues in
1299
let s = "GET / HTTP/1.0\n\n" in
1300
ignore(printer # output s 0 (String.length s));
1303
prerr_endline "HTTP RESPONSE RECEIVED!")
1305
prerr_endline "ERROR!")
1409
prerr_endline "HTTP RESPONSE RECEIVED!")
1411
prerr_endline "ERROR!")
1313
Some details: We can ignore the result of [printer#output]
1314
because the [printer] has unlimited capacity (the
1315
default of [output_async_descr] channels). Because
1316
[printer] is not closed, this channel does not
1317
close the destination descriptor [fd] (which would
1318
be fatal). The [receiver], however, closes the file
1319
descriptor when it finds the end of the input stream.
1321
1415
One important line is missing: Up to now we have only
1322
1416
set up the client, but it is not yet running. To invoke it we need:
1328
1422
This client is not perfect, not only, because it is restricted
1329
1423
to the most basic form of the HTTP protocol. The error handling could
1330
be better: In the case that [printer] transitions to
1331
error state, it will close [fd]. But the file descriptor
1332
is in use by [receiver] at the same time, causing
1333
a followup error that is finally reported. A better solution would
1334
use a duplicate of the file descriptor for [receiver],
1335
so both engines can independently close their descriptors. Furthermore,
1336
the error state of [printer] would be trapped.
1424
be better: The descriptor [fd] is not closed in this case.