~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/equeue/equeue_intro.txt

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
      {- {!Equeue_intro.eng_notify}}
25
25
      {- {!Equeue_intro.eng_async_ch}}
26
26
      {- {!Equeue_intro.eng_recv}}
 
27
      {- {!Equeue_intro.eng_uqio}}
27
28
      {- {!Equeue_intro.eng_eg}}
28
29
    }
29
30
  }
915
916
must return a second engine. The result of [seq_engine] is
916
917
the result of the second engine.
917
918
 
 
919
As [seq_engine] occurs frequently, there is a special operator
 
920
for it, [++]:
 
921
 
 
922
{[
 
923
  open Uq_engines.Operators
 
924
 
 
925
  let eng1 = connector addr1 ues in
 
926
  let eng123 = 
 
927
     eng1 ++
 
928
       (fun result1 ->
 
929
          let eng2 = connector addr2 ues in
 
930
          eng2 ++
 
931
            (fun result2 ->
 
932
               let eng3 = connector addr3 ues in
 
933
               eng3)))
 
934
]}
 
935
 
918
936
In these examples, we have called {!Unixqueue.run}
919
937
to start the event system. This function returns when all actions are
920
938
completed; this implies that finally all engines are synchronized again
927
945
  let eng1 = connector addr1 ues in
928
946
  let eng2 = connector addr2 ues in
929
947
  let eng12 = new sync_engine eng1 eng2 in
930
 
  let eng123 = new seq_engine
931
 
                 eng12
 
948
  let eng123 = eng12 ++
932
949
                 (fun result12 ->
933
950
                    let eng3 = connector addr3 ues in
934
951
                    eng3)
986
1003
           )
987
1004
]}
988
1005
 
989
 
Some more details: The callback function must be even
 
1006
Some more details: The callback function should be even
990
1007
called when only minor state changes occur, e.g. when
991
1008
[`Working n] changes to [`Working (n+1)].
992
1009
The engine is free to invoke the callback function even more
995
1012
Another detail: It is allowed that more callbacks are
996
1013
requested when a callback function is running.
997
1014
 
998
 
Note that all engine construction classes base on the
999
 
notification mechanism, so it is absolutely required that it
1000
 
is implemented error-free.
1001
 
 
1002
1015
 
1003
1016
 
1004
1017
{3:eng_async_ch Asynchronous channels}
1005
1018
 
 
1019
{b Editorial note:} This section describes a feature that is now
 
1020
seen as outdated, and often not the optimal way of doing async I/O.
 
1021
Asynchronous channels are still available, though. Readers may
 
1022
skip this section.
 
1023
 
1006
1024
Because engines are based on Unixqueues, one can imagine
1007
1025
that complex operations on file descriptors are executed by engines.
1008
1026
Actually, there is a primitive that copies the whole byte stream
1199
1217
 
1200
1218
{3:eng_recv Receivers}
1201
1219
 
 
1220
{b Editorial note:} This section describes a feature that is now
 
1221
seen as outdated, and often not the optimal way of doing async I/O.
 
1222
Asynchronous channels are still available, though. Readers may
 
1223
skip this section.
 
1224
 
1202
1225
The question is what one can do with asynchronous channels.
1203
1226
We have mentioned that these objects were designed with copy tasks
1204
1227
in mind that transfer data from file descriptors into data objects.
1227
1250
reports its state to the outer world. When the copy task has been
1228
1251
completed, it transitions into the state [`Done()].
1229
1252
 
1230
 
In the next section we present a real example that
1231
 
also uses the receiver class.
1232
 
    </sect1>
1233
 
 
 
1253
 
 
1254
 
 
1255
{3:eng_uqio The I/O functions in {!Uq_io}}
 
1256
 
 
1257
The functions in {!Uq_io} are patterned after the I/O functions
 
1258
in the standard library, only that they use the engine paradigm.
 
1259
For example, we have in [Pervasives]
 
1260
 
 
1261
{[
 
1262
val input : in_channel -> string -> int -> int -> int
 
1263
]}
 
1264
 
 
1265
for reading data from an [in_channel] and putting it into the string,
 
1266
and the corresponding function {!Uq_io.input_e} has the signature
 
1267
 
 
1268
{[
 
1269
val input_e : [< in_device ] -> string_like -> int -> int -> 
 
1270
                int Uq_engines.engine
 
1271
]}
 
1272
 
 
1273
Instead from an [in_channel] it gets data from an [in_device]. There
 
1274
are several kinds of {!Uq_io.in_device}, especially:
 
1275
 
 
1276
- [`Polldescr(fd_style, fd, esys)] is a device reading data from
 
1277
  the file descriptor [fd] via the event queue [esys]. The [fd_style]
 
1278
  indicates how to read data (whether it is [Unix.read], [Unix.recv],
 
1279
  or another system call).
 
1280
- [`Buffer_in b] is a device reading data from a buffer [b], and
 
1281
  [b] is in turn connected to another device serving as data source.
 
1282
  Buffers [b] are created with {!Uq_io.create_in_buffer}.
 
1283
 
 
1284
The type [string_like] allows the values [`String s] for a string [s],
 
1285
and [`Memory m] for a bigarray of chars [m].
 
1286
 
 
1287
There is, of course, also an {!Uq_io.out_device} for the other
 
1288
data flow direction. I/O functions include:
 
1289
 
 
1290
- {!Uq_io.input_e}
 
1291
- {!Uq_io.really_input_e}
 
1292
- {!Uq_io.input_line_e} (however, only for buffer-backed devices)
 
1293
- {!Uq_io.output_e}
 
1294
- {!Uq_io.really_output_e}
 
1295
- {!Uq_io.output_string_e}
 
1296
- {!Uq_io.output_memory_e}
 
1297
- {!Uq_io.output_netbuffer_e}
 
1298
- {!Uq_io.flush_e}
 
1299
- {!Uq_io.write_eof_e} and {!Uq_io.shutdown_e} for socket shutdown and
 
1300
  normal close
 
1301
- {!Uq_io.inactivate} for an unconditional close (w/o prior flush)
 
1302
 
 
1303
For example, let's develop a function reading line-by-line from a
 
1304
file descriptor to check whether the special line "pearl" exists:
 
1305
 
 
1306
{[
 
1307
let find_pearl fd esys =
 
1308
  let d1 = `Polldescr(Netsys.get_fd_style fd, fd, eys) in
 
1309
  let d2 = `Buffer_in(Uq_io.create_in_buffer d1) in
 
1310
  let found = ref false in
 
1311
 
 
1312
  let rec loop () =
 
1313
    Uq_io.input_line_e d2 ++
 
1314
      (fun line ->
 
1315
        if line = "pearl" then found := true;
 
1316
        loop()
 
1317
      ) in
 
1318
 
 
1319
  Uq_engines.map_engine
 
1320
    ~map_done:(fun _ -> `Done !found)
 
1321
    ~map_error:(fun err -> 
 
1322
                  if err = End_of_file then `Done !found else `Error err)
 
1323
    (loop())
 
1324
]}
 
1325
 
 
1326
The result type of [find_pearl] is [bool engine].
 
1327
 
 
1328
We exploit here that [input_line_e] raises [End_of_file] when the end
 
1329
of the input stream is reached. This exception is, of course, not directly
 
1330
raised, but rather the engine state [`Error End_of_file] is entered.
 
1331
Because of this, there is no test for the end of the recursion in [loop].
 
1332
The exception is caught by a [map_engine], and mapped to a regular
 
1333
result.
1234
1334
 
1235
1335
 
1236
1336
{3:eng_eg Example: A simple HTTP client}
1280
1380
]}
1281
1381
 
1282
1382
Now we are interested in the moment when the connection is
1283
 
established. In this moment, we set up an 
1284
 
[output_async_descr] object that copies
1285
 
its contents to the connection, so we can asynchronously
1286
 
send our HTTP request. Furthermore, we create an
 
1383
established. In this moment, we send the request using
 
1384
{!Uq_io.output_string_e}. Furthermore, we create an
1287
1385
[async_buffer] object that collects the
1288
1386
HTTP response, which can arrive at any time from now on.
1289
1387
 
1290
1388
{[
 
1389
  let e =
 
1390
    c ++ 
 
1391
      (fun connstat ->
 
1392
         match connstat with
 
1393
          | `Socket(fd, _) ->
 
1394
               prerr_endline "CONNECTED";     (* debug output *)
 
1395
               let d = `Polldescr(Netsys.get_fd_style fd, fd, ues) in
 
1396
               Uq_io.output_string_e d "GET / HTTP/1.0\n\n" ++
 
1397
                 (fun () ->
 
1398
                    Uq_io.write_eof_e d ++
 
1399
                      (fun _ ->
 
1400
                        let buffer = new async_buffer b in
 
1401
                        new receiver ~src:fd ~dst:buffer ues
 
1402
                      )
 
1403
                 )
 
1404
          | _ -> assert false
 
1405
      ) in
 
1406
 
1291
1407
  when_state
1292
 
    ~is_done:(fun connstat ->
1293
 
                match connstat with
1294
 
                    `Socket(fd, _) ->
1295
 
                      prerr_endline "CONNECTED";
1296
 
                      let printer = new output_async_descr ~dst:fd ues in
1297
 
                      let buffer = new async_buffer b in
1298
 
                      let receiver = new receiver ~src:fd ~dst:buffer ues in
1299
 
                      let s = "GET / HTTP/1.0\n\n" in
1300
 
                      ignore(printer # output s 0 (String.length s));
1301
 
                      when_state
1302
 
                        ~is_done:(fun _ ->
1303
 
                                    prerr_endline "HTTP RESPONSE RECEIVED!")
1304
 
                        ~is_error:(fun _ ->
1305
 
                                    prerr_endline "ERROR!")
1306
 
                        receiver
1307
 
                  | _ -> assert false
1308
 
             )
1309
 
    c
1310
 
  ;;
 
1408
    ~is_done:(fun _ ->
 
1409
                prerr_endline "HTTP RESPONSE RECEIVED!")
 
1410
    ~is_error:(fun _ ->
 
1411
                prerr_endline "ERROR!")
 
1412
    e
1311
1413
]}
1312
1414
 
1313
 
Some details: We can ignore the result of [printer#output]
1314
 
because the [printer] has unlimited capacity (the
1315
 
default of [output_async_descr] channels). Because
1316
 
[printer] is not closed, this channel does not
1317
 
close the destination descriptor [fd] (which would
1318
 
be fatal). The [receiver], however, closes the file
1319
 
descriptor when it finds the end of the input stream.
1320
 
 
1321
1415
One important line is missing: Up to now we have only
1322
1416
set up the client, but it is not yet running. To invoke it we need:
1323
1417
 
1327
1421
 
1328
1422
This client is not perfect, not only, because it is restricted
1329
1423
to the most basic form of the HTTP protocol. The error handling could
1330
 
be better: In the case that [printer] transitions to
1331
 
error state, it will close [fd]. But the file descriptor
1332
 
is in use by [receiver] at the same time, causing
1333
 
a followup error that is finally reported. A better solution would
1334
 
use a duplicate of the file descriptor for [receiver],
1335
 
so both engines can independently close their descriptors. Furthermore,
1336
 
the error state of [printer] would be trapped.
 
1424
be better: The descriptor [fd] is not closed in this case.
1337
1425
 
1338
1426
 
1339
1427