~ubuntu-branches/ubuntu/wily/phabricator/wily

« back to all changes in this revision

Viewing changes to src/hgdaemon/ArcanistHgProxyServer.php

  • Committer: Package Import Robot
  • Author(s): Richard Sellam
  • Date: 2014-11-01 23:20:06 UTC
  • mto: This revision was merged to the branch mainline in revision 4.
  • Revision ID: package-import@ubuntu.com-20141101232006-mvlnp0cil67tsboe
Tags: upstream-0~git20141101/arcanist
Import upstream version 0~git20141101, component arcanist

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
<?php
 
2
 
 
3
/**
 
4
 * Server which @{class:ArcanistHgProxyClient} clients connect to. This
 
5
 * server binds to a Mercurial working copy and creates a Mercurial process and
 
6
 * a unix domain socket in that working copy. It listens for connections on
 
7
 * the socket, reads commands from them, and forwards their requests to the
 
8
 * Mercurial process. It then returns responses to the original clients.
 
9
 *
 
10
 * Note that this server understands the underlying protocol and completely
 
11
 * decodes messages from both the client and server before re-encoding them
 
12
 * and relaying them to their final destinations. It must do this (at least
 
13
 * in part) to determine where messages begin and end. Additionally, this proxy
 
14
 * sends and receives the Mercurial cmdserver protocol exactly, without
 
15
 * any extensions or sneakiness.
 
16
 *
 
17
 * The advantage of this mechanism is that it avoids the overhead of starting
 
18
 * a Mercurial process for each Mercurial command, which can exceed 100ms per
 
19
 * invocation. This server can also accept connections from multiple clients
 
20
 * and serve them from a single Mercurial server process.
 
21
 *
 
22
 * @task construct  Construction
 
23
 * @task config     Configuration
 
24
 * @task server     Serving Requests
 
25
 * @task client     Managing Clients
 
26
 * @task hg         Managing Mercurial
 
27
 * @task internal   Internals
 
28
 */
 
29
final class ArcanistHgProxyServer {
 
30
 
 
31
  private $workingCopy;
 
32
  private $socket;
 
33
  private $hello;
 
34
 
 
35
  private $quiet;
 
36
 
 
37
  private $clientLimit;
 
38
  private $lifetimeClientCount;
 
39
 
 
40
  private $idleLimit;
 
41
  private $idleSince;
 
42
 
 
43
  private $skipHello;
 
44
 
 
45
  private $doNotDaemonize;
 
46
 
 
47
 
 
48
/* -(  Construction  )------------------------------------------------------- */
 
49
 
 
50
 
 
51
  /**
 
52
   * Build a new server. This server is bound to a working copy. The server
 
53
   * is inactive until you @{method:start} it.
 
54
   *
 
55
   * @param string Path to a Mercurial working copy.
 
56
   *
 
57
   * @task construct
 
58
   */
 
59
  public function __construct($working_copy) {
 
60
    $this->workingCopy = Filesystem::resolvePath($working_copy);
 
61
  }
 
62
 
 
63
 
 
64
/* -(  Configuration  )------------------------------------------------------ */
 
65
 
 
66
 
 
67
  /**
 
68
   * Disable status messages to stdout. Controlled with `--quiet`.
 
69
   *
 
70
   * @param bool  True to disable status messages.
 
71
   * @return this
 
72
   *
 
73
   * @task config
 
74
   */
 
75
  public function setQuiet($quiet) {
 
76
    $this->quiet = $quiet;
 
77
    return $this;
 
78
  }
 
79
 
 
80
 
 
81
  /**
 
82
   * Configure a client limit. After serving this many clients, the server
 
83
   * will exit. Controlled with `--client-limit`.
 
84
   *
 
85
   * You can use `--client-limit 1` with `--xprofile` and `--do-not-daemonize`
 
86
   * to profile the server.
 
87
   *
 
88
   * @param int Client limit, or 0 to disable limit.
 
89
   * @return this
 
90
   *
 
91
   * @task config
 
92
   */
 
93
  public function setClientLimit($limit) {
 
94
    $this->clientLimit = $limit;
 
95
    return $this;
 
96
  }
 
97
 
 
98
 
 
99
  /**
 
100
   * Configure an idle time limit. After this many seconds idle, the server
 
101
   * will exit. Controlled with `--idle-limit`.
 
102
   *
 
103
   * @param int Idle limit, or 0 to disable limit.
 
104
   * @return this
 
105
   *
 
106
   * @task config
 
107
   */
 
108
  public function setIdleLimit($limit) {
 
109
    $this->idleLimit = $limit;
 
110
    return $this;
 
111
  }
 
112
 
 
113
 
 
114
  /**
 
115
   * When clients connect, do not send the "capabilities" message expected by
 
116
   * the Mercurial protocol. This deviates from the protocol and will only work
 
117
   * if the clients are also configured not to expect the message, but slightly
 
118
   * improves performance. Controlled with --skip-hello.
 
119
   *
 
120
   * @param bool True to skip the "capabilities" message.
 
121
   * @return this
 
122
   *
 
123
   * @task config
 
124
   */
 
125
  public function setSkipHello($skip) {
 
126
    $this->skipHello = $skip;
 
127
    return $this;
 
128
  }
 
129
 
 
130
 
 
131
  /**
 
132
   * Configure whether the server runs in the foreground or daemonizes.
 
133
   * Controlled by --do-not-daemonize. Primarily useful for debugging.
 
134
   *
 
135
   * @param bool True to run in the foreground.
 
136
   * @return this
 
137
   *
 
138
   * @task config
 
139
   */
 
140
  public function setDoNotDaemonize($do_not_daemonize) {
 
141
    $this->doNotDaemonize = $do_not_daemonize;
 
142
    return $this;
 
143
  }
 
144
 
 
145
 
 
146
/* -(  Serving Requests  )--------------------------------------------------- */
 
147
 
 
148
 
 
149
  /**
 
150
   * Start the server. This method returns after the client limit or idle
 
151
   * limit are exceeded. If neither limit is configured, this method does not
 
152
   * exit.
 
153
   *
 
154
   * @return null
 
155
   *
 
156
   * @task server
 
157
   */
 
158
  public function start() {
 
159
    // Create the unix domain socket in the working copy to listen for clients.
 
160
    $socket = $this->startWorkingCopySocket();
 
161
    $this->socket = $socket;
 
162
 
 
163
    if (!$this->doNotDaemonize) {
 
164
      $this->daemonize();
 
165
    }
 
166
 
 
167
    // Start the Mercurial process which we'll forward client requests to.
 
168
    $hg = $this->startMercurialProcess();
 
169
    $clients = array();
 
170
 
 
171
    $this->log(null, 'Listening');
 
172
    $this->idleSince = time();
 
173
    while (true) {
 
174
      // Wait for activity on any active clients, the Mercurial process, or
 
175
      // the listening socket where new clients connect.
 
176
      PhutilChannel::waitForAny(
 
177
        array_merge($clients, array($hg)),
 
178
        array(
 
179
          'read'    => $socket ? array($socket) : array(),
 
180
          'except'  => $socket ? array($socket) : array(),
 
181
        ));
 
182
 
 
183
      if (!$hg->update()) {
 
184
        throw new Exception('Server exited unexpectedly!');
 
185
      }
 
186
 
 
187
      // Accept any new clients.
 
188
      while ($socket && ($client = $this->acceptNewClient($socket))) {
 
189
        $clients[] = $client;
 
190
        $key = last_key($clients);
 
191
        $client->setName($key);
 
192
 
 
193
        $this->log($client, 'Connected');
 
194
        $this->idleSince = time();
 
195
 
 
196
        // Check if we've hit the client limit. If there's a configured
 
197
        // client limit and we've hit it, stop accepting new connections
 
198
        // and close the socket.
 
199
 
 
200
        $this->lifetimeClientCount++;
 
201
 
 
202
        if ($this->clientLimit) {
 
203
          if ($this->lifetimeClientCount >= $this->clientLimit) {
 
204
            $this->closeSocket();
 
205
            $socket = null;
 
206
          }
 
207
        }
 
208
      }
 
209
 
 
210
      // Update all the active clients.
 
211
      foreach ($clients as $key => $client) {
 
212
        if ($this->updateClient($client, $hg)) {
 
213
          // In this case, the client is still connected so just move on to
 
214
          // the next one. Otherwise we continue below and handle the
 
215
          // disconnect.
 
216
          continue;
 
217
        }
 
218
 
 
219
        $this->log($client, 'Disconnected');
 
220
        unset($clients[$key]);
 
221
 
 
222
        // If we have a client limit and we've served that many clients, exit.
 
223
 
 
224
        if ($this->clientLimit) {
 
225
          if ($this->lifetimeClientCount >= $this->clientLimit) {
 
226
            if (!$clients) {
 
227
              $this->log(null, 'Exiting (Client Limit)');
 
228
              return;
 
229
            }
 
230
          }
 
231
        }
 
232
      }
 
233
 
 
234
      // If we have an idle limit and haven't had any activity in at least
 
235
      // that long, exit.
 
236
      if ($this->idleLimit) {
 
237
        $remaining = $this->idleLimit - (time() - $this->idleSince);
 
238
        if ($remaining <= 0) {
 
239
          $this->log(null, 'Exiting (Idle Limit)');
 
240
          return;
 
241
        }
 
242
        if ($remaining <= 5) {
 
243
          $this->log(null, 'Exiting in '.$remaining.' seconds');
 
244
        }
 
245
      }
 
246
    }
 
247
  }
 
248
 
 
249
 
 
250
  /**
 
251
   * Update one client, processing any commands it has sent us. We fully
 
252
   * process all commands we've received here before returning to the main
 
253
   * server loop.
 
254
   *
 
255
   * @param ArcanistHgClientChannel The client to update.
 
256
   * @param ArcanistHgServerChannel The Mercurial server.
 
257
   *
 
258
   * @task server
 
259
   */
 
260
  private function updateClient(
 
261
    ArcanistHgClientChannel $client,
 
262
    ArcanistHgServerChannel $hg) {
 
263
 
 
264
    if (!$client->update()) {
 
265
      // Client has disconnected, don't bother proceeding.
 
266
      return false;
 
267
    }
 
268
 
 
269
    // Read a command from the client if one is available. Note that we stop
 
270
    // updating other clients or accepting new connections while processing a
 
271
    // command, since there isn't much we can do with them until the server
 
272
    // finishes executing this command.
 
273
    $message = $client->read();
 
274
    if (!$message) {
 
275
      return true;
 
276
    }
 
277
 
 
278
    $this->log($client, '$ '.$message[0].' '.$message[1]);
 
279
    $t_start = microtime(true);
 
280
 
 
281
    // Forward the command to the server.
 
282
    $hg->write($message);
 
283
 
 
284
    while (true) {
 
285
      PhutilChannel::waitForAny(array($client, $hg));
 
286
 
 
287
      if (!$client->update() || !$hg->update()) {
 
288
        // If either the client or server has exited, bail.
 
289
        return false;
 
290
      }
 
291
 
 
292
      $response = $hg->read();
 
293
      if (!$response) {
 
294
        continue;
 
295
      }
 
296
 
 
297
      // Forward the response back to the client.
 
298
      $client->write($response);
 
299
 
 
300
      // If the response was on the 'r'esult channel, it indicates the end
 
301
      // of the command output. We can process the next command (if any
 
302
      // remain) or go back to accepting new connections and servicing
 
303
      // other clients.
 
304
      if ($response[0] == 'r') {
 
305
        // Update the client immediately to try to get the bytes on the wire
 
306
        // as quickly as possible. This gives us slightly more throughput.
 
307
        $client->update();
 
308
        break;
 
309
      }
 
310
    }
 
311
 
 
312
    // Log the elapsed time.
 
313
    $t_end = microtime(true);
 
314
    $t = 1000000 * ($t_end - $t_start);
 
315
    $this->log($client, '< '.number_format($t, 0).'us');
 
316
 
 
317
    $this->idleSince = time();
 
318
 
 
319
    return true;
 
320
  }
 
321
 
 
322
 
 
323
/* -(  Managing Clients  )--------------------------------------------------- */
 
324
 
 
325
 
 
326
  /**
 
327
   * @task client
 
328
   */
 
329
  public static function getPathToSocket($working_copy) {
 
330
    return $working_copy.'/.hg/hgdaemon-socket';
 
331
  }
 
332
 
 
333
 
 
334
  /**
 
335
   * @task client
 
336
   */
 
337
  private function startWorkingCopySocket() {
 
338
    $errno = null;
 
339
    $errstr = null;
 
340
 
 
341
    $socket_path = self::getPathToSocket($this->workingCopy);
 
342
    $socket_uri  = 'unix://'.$socket_path;
 
343
 
 
344
    $socket = @stream_socket_server($socket_uri, $errno, $errstr);
 
345
    if ($errno || !$socket) {
 
346
      Filesystem::remove($socket_path);
 
347
      $socket = @stream_socket_server($socket_uri, $errno, $errstr);
 
348
    }
 
349
 
 
350
    if ($errno || !$socket) {
 
351
      throw new Exception(
 
352
        "Unable to start socket! Error #{$errno}: {$errstr}");
 
353
    }
 
354
 
 
355
    $ok = stream_set_blocking($socket, 0);
 
356
    if ($ok === false) {
 
357
      throw new Exception('Unable to set socket nonblocking!');
 
358
    }
 
359
 
 
360
    return $socket;
 
361
  }
 
362
 
 
363
 
 
364
  /**
 
365
   * @task client
 
366
   */
 
367
  private function acceptNewClient($socket) {
 
368
    // NOTE: stream_socket_accept() always blocks, even when the socket has
 
369
    // been set nonblocking.
 
370
    $new_client = @stream_socket_accept($socket, $timeout = 0);
 
371
    if (!$new_client) {
 
372
      return null;
 
373
    }
 
374
 
 
375
    $channel = new PhutilSocketChannel($new_client);
 
376
    $client = new ArcanistHgClientChannel($channel);
 
377
 
 
378
    if (!$this->skipHello) {
 
379
      $client->write($this->hello);
 
380
    }
 
381
 
 
382
    return $client;
 
383
  }
 
384
 
 
385
 
 
386
/* -(  Managing Mercurial  )------------------------------------------------- */
 
387
 
 
388
 
 
389
  /**
 
390
   * Starts a Mercurial process which can actually handle requests.
 
391
   *
 
392
   * @return ArcanistHgServerChannel  Channel to the Mercurial server.
 
393
   * @task hg
 
394
   */
 
395
  private function startMercurialProcess() {
 
396
    // NOTE: "cmdserver.log=-" makes Mercurial use the 'd'ebug channel for
 
397
    // log messages.
 
398
 
 
399
    $future = new ExecFuture(
 
400
      'HGPLAIN=1 hg --config cmdserver.log=- serve --cmdserver pipe');
 
401
    $future->setCWD($this->workingCopy);
 
402
 
 
403
    $channel = new PhutilExecChannel($future);
 
404
    $hg = new ArcanistHgServerChannel($channel);
 
405
 
 
406
    // The server sends a "hello" message with capability and encoding
 
407
    // information. Save it and forward it to clients when they connect.
 
408
    $this->hello = $hg->waitForMessage();
 
409
 
 
410
    return $hg;
 
411
  }
 
412
 
 
413
 
 
414
/* -(  Internals  )---------------------------------------------------------- */
 
415
 
 
416
 
 
417
  /**
 
418
   * Close and remove the unix domain socket in the working copy.
 
419
   *
 
420
   * @task internal
 
421
   */
 
422
  public function __destruct() {
 
423
    $this->closeSocket();
 
424
  }
 
425
 
 
426
  private function closeSocket() {
 
427
    if ($this->socket) {
 
428
      @stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
 
429
      @fclose($this->socket);
 
430
      Filesystem::remove(self::getPathToSocket($this->workingCopy));
 
431
      $this->socket = null;
 
432
    }
 
433
  }
 
434
 
 
435
  private function log($client, $message) {
 
436
    if ($this->quiet) {
 
437
      return;
 
438
    }
 
439
 
 
440
    if ($client) {
 
441
      $message = '[Client '.$client->getName().'] '.$message;
 
442
    } else {
 
443
      $message = '[Server] '.$message;
 
444
    }
 
445
 
 
446
    echo $message."\n";
 
447
  }
 
448
 
 
449
  private function daemonize() {
 
450
    // Keep stdout if it's been redirected somewhere, otherwise shut it down.
 
451
    $keep_stdout = false;
 
452
    $keep_stderr = false;
 
453
    if (function_exists('posix_isatty')) {
 
454
      if (!posix_isatty(STDOUT)) {
 
455
        $keep_stdout = true;
 
456
      }
 
457
      if (!posix_isatty(STDERR)) {
 
458
        $keep_stderr = true;
 
459
      }
 
460
    }
 
461
 
 
462
    $pid = pcntl_fork();
 
463
    if ($pid === -1) {
 
464
      throw new Exception('Unable to fork!');
 
465
    } else if ($pid) {
 
466
      // We're the parent; exit. First, drop our reference to the socket so
 
467
      // our __destruct() doesn't tear it down; the child will tear it down
 
468
      // later.
 
469
      $this->socket = null;
 
470
      exit(0);
 
471
    }
 
472
 
 
473
    // We're the child; continue.
 
474
 
 
475
    fclose(STDIN);
 
476
 
 
477
    if (!$keep_stdout) {
 
478
      fclose(STDOUT);
 
479
      $this->quiet = true;
 
480
    }
 
481
 
 
482
    if (!$keep_stderr) {
 
483
      fclose(STDERR);
 
484
    }
 
485
  }
 
486
 
 
487
}