SimGrid
Token Ring example

This example implements the token ring algorithm. It involves several nodes arranged in a ring (each of them have a left and a right neighbour) and exchanging a "token". This algorithm is one of the solution to ensure the mutual exclusion between distributed processes. There is only one token at any time, so the process in its possession is ensured to be the only one having it. So, if there is an action you want all processes to do alternativly, but you cannot afford to have two processes doing it at the same time, let the process having the token doing it.

Actually, there is a lot of different token ring algorithms in the litterature, so this example implements one of them: the simplest one. The ring is static (no new node can join it, and you'll get trouble if one node dies or leaves), and nothing is done for the case in which the token is lost.

1) Deployment file

Here is the deployment file:

<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
<platform version="3">

 <process host="Tremblay" function="node">
  <argument value="4000"/>                           <!-- port on which I am listening -->
  <argument value="Fafard"/><argument value="4000"/> <!-- peer (successor) host id and port-->
  <argument value="--create-token"/>                 <!-- I'm first client, ie I have to create the token -->
 </process>
 
 <process host="Fafard" function="node">
  <argument value="4000"/>                           <!-- port on which I am listening -->
  <argument value="Jupiter"/><argument value="4000"/><!-- peer (successor) host id and port-->
 </process>
 
 <process host="Jupiter" function="node">
  <argument value="4000"/>                            <!-- port on which I am listening -->
  <argument value="Ginette"/><argument value="4000"/> <!-- peer (successor) host id and port-->
 </process>
 
 <process host="Ginette" function="node">
  <argument value="4000"/>                            <!-- port on which I am listening -->
  <argument value="Bourassa"/><argument value="4000"/><!-- peer (successor) host id and port-->
 </process>
 
 <process host="Bourassa" function="node">
  <argument value="4000"/>                            <!-- port on which I am listening -->
  <argument value="Tremblay"/><argument value="4000"/><!-- peer (successor) host id and port-->
 </process>
 
</platform>

The neighbour of each node is given at startup as command line argument. Moreover, one of the nodes is instructed by a specific argument (the one on Tremblay here) to create the token at the begining of the algorithm.

2) Global definition

The token is incarned by a specific message, which circulates from node to node (the payload is an integer incremented at each hop). So, the most important part of the code is the message callback, which forwards the message to the next node. That is why we have to store all variable in a global, as explained in the Globals section.

typedef struct {
  gras_socket_t sock;           /* server socket on which I hear */
  int remaining_loop;           /* number of loops to do until done */
  int create;                   /* whether I have to create the token */
  gras_socket_t tosuccessor;    /* how to connect to the successor on ring */
  double start_time;            /* to measure the elapsed time. Only used by the 
                                   node that creates the token */
} node_data_t;

3) The callback

Even if this is the core of this algorithm, this function is quite straightforward.

static int node_cb_stoken_handler(gras_msg_cb_ctx_t ctx, void *payload)
{
  /* 1. Get the payload into the msg variable, and retrieve my caller */
  int msg = *(int *) payload;
  gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);


  /* 2. Retrieve the node's state (globals) */
  node_data_t *globals = (node_data_t *) gras_userdata_get();

  /* 3. Log which predecessor connected */
  int supersteps = 1;           /* only log every superstep loop */
  if (NBLOOPS >= 1000) {
    supersteps = 100;
  } else if (NBLOOPS >= 100) {
    supersteps = 10;
  } else if (NBLOOPS <= 10) {
    supersteps = 1;
  }
  if (globals->create && (!(globals->remaining_loop % supersteps))) {
    XBT_INFO("Begin a new loop. Still to do: %d", globals->remaining_loop);
  } else if (!(globals->remaining_loop % supersteps)) {
    XBT_VERB("Got token(%d) from %s remaining_loop=%d",
          msg, gras_socket_peer_name(expeditor), globals->remaining_loop);
  }

  /* 4. If the right shouldn't be stopped yet */
  if (globals->remaining_loop > 0) {
    msg += 1;

    XBT_DEBUG("Send token(%d) to %s:%d", msg,
           gras_socket_peer_name(globals->tosuccessor),
           gras_socket_peer_port(globals->tosuccessor));

    /* 5. Send the token as payload of a stoken message to the successor */
    TRY {
      gras_msg_send(globals->tosuccessor, "stoken", &msg);

      /* 6. Deal with errors */
    }
    CATCH_ANONYMOUS {
      gras_socket_close(globals->sock);
      RETHROWF("Unable to forward token: %s");
    }

  }

  /* DO NOT CLOSE THE expeditor SOCKET since the client socket is
     reused by our predecessor.
     Closing this side would thus create troubles */

  /* 7. Decrease the remaining_loop integer. */
  globals->remaining_loop -= 1;

  /* 8. Repport the hop number to the user at the end */
  if (globals->remaining_loop == -1 && globals->create) {
    double elapsed = gras_os_time() - globals->start_time;
    XBT_INFO("Shut down the token-ring. There was %d hops.", msg);
    XBT_VERB("Elapsed time: %g", elapsed);
  }

  /* 9. Tell GRAS that we consummed this message */
  return 0;
}                               /* end_of_node_cb_stoken_handler */

4) The main function

This function is splited in two parts: The first one performs all the needed initialisations (points 1-7) while the end (point 8. below) calls gras_msg_handle() as long as the planned amount of ring loops are not performed.

int node(int argc, char *argv[])
{
  node_data_t *globals;

  const char *host;
  int myport;
  int peerport;

  /* 1. Init the GRAS infrastructure and declare my globals */
  gras_init(&argc, argv);
  globals = gras_userdata_new(node_data_t);


  /* 2. Get the successor's address. The command line overrides
     defaults when specified */
  host = "127.0.0.1";
  myport = 4000;
  peerport = 4000;
  if (argc >= 4) {
    myport = atoi(argv[1]);
    host = argv[2];
    peerport = atoi(argv[3]);
  }

  /* 3. Save successor's address in global var */
  globals->remaining_loop = NBLOOPS;
  globals->create = 0;
  globals->tosuccessor = NULL;

  if (!gras_os_getpid() % 100 || gras_if_RL())
    XBT_INFO("Launch node listening on %d (successor on %s:%d)",
          myport, host, peerport);

  /* 4. Register the known messages.  */
  gras_msgtype_declare("stoken", gras_datadesc_by_name("int"));

  /* 5. Create my master socket for listening */
  globals->sock = gras_socket_server(myport);
  gras_os_sleep(1.0);           /* Make sure all server sockets are created */

  /* 6. Create socket to the successor on the ring */
  XBT_DEBUG("Connect to my successor on %s:%d", host, peerport);

  globals->tosuccessor = gras_socket_client(host, peerport);

  /* 7. Register my callback */
  gras_cb_register("stoken", &node_cb_stoken_handler);

  /* 8. One node has to create the token at startup. 
     It's specified by a command line argument */
  if (argc >= 5
      && !strncmp("--create-token", argv[4], strlen("--create-token")))
    globals->create = 1;

  if (globals->create) {
    int token = 0;

    globals->start_time = gras_os_time();

    globals->remaining_loop = NBLOOPS - 1;

    XBT_INFO("Create the token (with value %d) and send it to %s:%d",
          token, host, peerport);

    TRY {
      gras_msg_send(globals->tosuccessor, "stoken", &token);
    }
    CATCH_ANONYMOUS {
      RETHROWF("Unable to send the freshly created token: %s");
    }
  }

  /* 8. Wait up to 10 seconds for an incomming message to handle */
  while (globals->remaining_loop > (globals->create ? -1 : 0)) {
    gras_msg_handle(-1);

    XBT_DEBUG("looping (remaining_loop=%d)", globals->remaining_loop);
  }

  gras_os_sleep(1.0);           /* FIXME: if the sender quited, receive fail */

  /* 9. Free the allocated resources, and shut GRAS down */
  gras_socket_close(globals->sock);
  gras_socket_close(globals->tosuccessor);
  free(globals);
  gras_exit();

  return 0;
}                               /* end_of_node */


Back to the main Simgrid Documentation page The version of Simgrid documented here is v3.6.1.
Documentation of other versions can be found in their respective archive files (directory doc/html).
Generated for SimGridAPI by doxygen