254
GstNetClientClock *clock;
256
} GstNetClientClockTimeoutSource;
259
gst_net_client_clock_timeout_source_prepare (GSource * s, gint * p_timeout)
261
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
262
GstClockTime expiration_time = source->clock->priv->timeout_expiration;
263
GstClockTime now = gst_util_get_timestamp ();
265
if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
270
*p_timeout = (expiration_time - now) / GST_MSECOND;
271
GST_TRACE_OBJECT (source->clock, "time out in %d ms please", *p_timeout);
276
gst_net_client_clock_timeout_source_check (GSource * s)
278
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
280
return (gst_util_get_timestamp () >= source->clock->priv->timeout_expiration);
284
gst_net_client_clock_timeout_source_dispatch (GSource * s, GSourceFunc cb,
287
GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
289
GST_TRACE_OBJECT (source->clock, "timed out");
290
*source->p_timeout = TRUE;
295
gst_net_client_clock_socket_cb (GSocket * socket, GIOCondition condition,
298
GIOCondition *p_cond = user_data;
300
GST_TRACE ("socket %p I/O condition: 0x%02x", socket, condition);
306
252
gst_net_client_clock_thread (gpointer data)
308
254
GstNetClientClock *self = data;
309
255
GstNetTimePacket *packet;
311
GSourceFuncs funcs = { NULL, };
315
256
GSocket *socket = self->priv->socket;
316
257
GError *err = NULL;
317
258
GstClock *clock = data;
321
262
g_socket_set_blocking (socket, TRUE);
322
263
g_socket_set_timeout (socket, 0);
324
ctx = g_main_context_new ();
326
source = g_socket_create_source (socket, G_IO_IN, self->priv->cancel);
327
g_source_set_name (source, "GStreamer net client clock thread socket");
328
g_source_set_callback (source, (GSourceFunc) gst_net_client_clock_socket_cb,
330
g_source_attach (source, ctx);
331
g_source_unref (source);
333
/* GSocket only support second granularity for timeouts, so roll our own
334
* timeout source (so we don't have to create a new source whenever the
335
* timeout changes, as we would have to do with the default timeout source) */
336
funcs.prepare = gst_net_client_clock_timeout_source_prepare;
337
funcs.check = gst_net_client_clock_timeout_source_check;
338
funcs.dispatch = gst_net_client_clock_timeout_source_dispatch;
339
funcs.finalize = NULL;
340
source = g_source_new (&funcs, sizeof (GstNetClientClockTimeoutSource));
341
((GstNetClientClockTimeoutSource *) source)->clock = self;
342
((GstNetClientClockTimeoutSource *) source)->p_timeout = &timeout;
343
g_source_set_name (source, "GStreamer net client clock timeout");
344
g_source_attach (source, ctx);
345
g_source_unref (source);
347
265
while (!g_cancellable_is_cancelled (self->priv->cancel)) {
350
g_main_context_iteration (ctx, TRUE);
352
if (g_cancellable_is_cancelled (self->priv->cancel))
356
/* timed out, let's send another packet */
357
GST_DEBUG_OBJECT (self, "timed out");
359
packet = gst_net_time_packet_new (NULL);
361
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
363
GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT,
364
GST_TIME_ARGS (packet->local_time));
366
gst_net_time_packet_send (packet, self->priv->socket,
367
self->priv->servaddr, NULL);
371
/* reset timeout (but are expecting a response sooner anyway) */
372
self->priv->timeout_expiration =
373
gst_util_get_timestamp () + gst_clock_get_timeout (clock);
266
GstClockTime expiration_time = self->priv->timeout_expiration;
267
GstClockTime now = gst_util_get_timestamp ();
268
gint64 socket_timeout;
270
if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
273
socket_timeout = (expiration_time - now) / GST_USECOND;
377
/* got data to read? */
378
if ((cond & G_IO_IN)) {
276
GST_TRACE_OBJECT (self, "time out in %d microsecs please", socket_timeout);
278
if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
279
self->priv->cancel, &err)) {
280
/* cancelled, timeout or error */
281
if (err->code == G_IO_ERROR_CANCELLED) {
282
GST_INFO_OBJECT (self, "cancelled");
283
g_clear_error (&err);
285
} else if (err->code == G_IO_ERROR_TIMED_OUT) {
286
/* timed out, let's send another packet */
287
GST_DEBUG_OBJECT (self, "timed out");
289
packet = gst_net_time_packet_new (NULL);
291
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
293
GST_DEBUG_OBJECT (self,
294
"sending packet, local time = %" GST_TIME_FORMAT,
295
GST_TIME_ARGS (packet->local_time));
297
gst_net_time_packet_send (packet, self->priv->socket,
298
self->priv->servaddr, NULL);
302
/* reset timeout (but are expecting a response sooner anyway) */
303
self->priv->timeout_expiration =
304
gst_util_get_timestamp () + gst_clock_get_timeout (clock);
306
GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
307
g_usleep (G_USEC_PER_SEC / 10); /* throttle */
309
g_clear_error (&err);
379
311
GstClockTime new_local;
381
315
new_local = gst_clock_get_internal_time (GST_CLOCK (self));
383
317
packet = gst_net_time_packet_receive (socket, NULL, &err);
319
if (packet != NULL) {
320
GST_LOG_OBJECT (self, "got packet back");
321
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
322
GST_TIME_ARGS (packet->local_time));
323
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
324
GST_TIME_ARGS (packet->remote_time));
325
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
326
GST_TIME_ARGS (new_local));
328
/* observe_times will reset the timeout */
329
gst_net_client_clock_observe_times (self, packet->local_time,
330
packet->remote_time, new_local);
333
} else if (err != NULL) {
386
334
GST_WARNING_OBJECT (self, "receive error: %s", err->message);
335
g_clear_error (&err);
392
GST_LOG_OBJECT (self, "got packet back");
393
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
394
GST_TIME_ARGS (packet->local_time));
395
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
396
GST_TIME_ARGS (packet->remote_time));
397
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
398
GST_TIME_ARGS (new_local));
400
/* observe_times will reset the timeout */
401
gst_net_client_clock_observe_times (self, packet->local_time,
402
packet->remote_time, new_local);
408
if ((cond & (G_IO_ERR | G_IO_HUP))) {
409
GST_DEBUG_OBJECT (self, "socket error?! %s", g_strerror (errno));
410
g_usleep (G_USEC_PER_SEC / 10);
415
340
GST_INFO_OBJECT (self, "shutting down net client clock thread");
416
g_main_context_unref (ctx);