1
/* -*- mode: c; c-basic-offset: 4; -*-
3
* remote-client.c - A client for communicating with remote Fyre
4
* servers. The remote Fyre servers may actually
5
* be on the local machine, or they may be connected
8
* Fyre - rendering and interactive exploration of chaotic functions
9
* Copyright (C) 2004-2005 David Trowbridge and Micah Dowty
11
* This program is free software; you can redistribute it and/or
12
* modify it under the terms of the GNU General Public License
13
* as published by the Free Software Foundation; either version 2
14
* of the License, or (at your option) any later version.
16
* This program is distributed in the hope that it will be useful,
17
* but WITHOUT ANY WARRANTY; without even the implied warranty of
18
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
* GNU General Public License for more details.
21
* You should have received a copy of the GNU General Public License
22
* along with this program; if not, write to the Free Software
23
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
34
#include "remote-client.h"
36
static void remote_client_class_init (RemoteClientClass* klass);
37
static void remote_client_init (RemoteClient* self);
38
static void remote_client_dispose (GObject* gobject);
39
static void remote_client_callback (GConn* gconn,
42
static void remote_client_recv_binary (RemoteClient* self,
44
static void remote_client_recv_line (RemoteClient* self,
46
static void remote_client_update_status (RemoteClient* self,
49
static void histogram_merge_callback (RemoteClient* self,
50
RemoteResponse* response,
52
static void remote_client_start_retry (RemoteClient* self);
53
static void remote_client_stop_retry (RemoteClient* self);
54
static gboolean remote_client_retry_callback(gpointer user_data);
55
static void remote_client_empty_queue (RemoteClient* self);
57
/* Smallest time interval, in seconds, to allow in speed calculations */
58
#define MINIMUM_SPEED_WINDOW 1.0
61
/************************************************************************************/
62
/**************************************************** Initialization / Finalization */
63
/************************************************************************************/
65
GType remote_client_get_type(void)
67
static GType anim_type = 0;
70
static const GTypeInfo dj_info = {
71
sizeof(RemoteClientClass),
73
NULL, /* base_finalize */
74
(GClassInitFunc) remote_client_class_init,
75
NULL, /* class_finalize */
76
NULL, /* class_data */
79
(GInstanceInitFunc) remote_client_init,
82
anim_type = g_type_register_static(G_TYPE_OBJECT, "RemoteClient", &dj_info, 0);
88
static void remote_client_class_init(RemoteClientClass *klass)
90
GObjectClass *object_class;
91
object_class = (GObjectClass*) klass;
93
object_class->dispose = remote_client_dispose;
96
static void remote_client_dispose(GObject *gobject)
98
RemoteClient *self = REMOTE_CLIENT(gobject);
100
remote_client_stop_retry(self);
103
gnet_conn_delete(self->gconn);
107
if (self->response_queue) {
108
remote_client_empty_queue(self);
109
g_queue_free(self->response_queue);
110
self->response_queue = NULL;
113
if (self->status_speed_timer) {
114
g_timer_destroy(self->status_speed_timer);
115
self->status_speed_timer = NULL;
117
if (self->stream_speed_timer) {
118
g_timer_destroy(self->stream_speed_timer);
119
self->stream_speed_timer = NULL;
121
if (self->stream_request_timer) {
122
g_timer_destroy(self->stream_request_timer);
123
self->stream_request_timer = NULL;
128
void remote_client_empty_queue (RemoteClient* self)
130
/* Empty the queue of any outstanding requests */
131
RemoteClosure *closure;
132
while ((closure = g_queue_pop_tail(self->response_queue)))
136
static void remote_client_init(RemoteClient *self)
138
self->response_queue = g_queue_new();
139
self->status_speed_timer = g_timer_new();
140
self->stream_speed_timer = g_timer_new();
141
self->stream_request_timer = g_timer_new();
143
/* Default stream interval: every second */
144
self->min_stream_interval = 1.0;
146
/* By default, retry connections every minute */
147
self->retry_timeout = 60.0;
148
self->is_retry_enabled = TRUE;
151
RemoteClient* remote_client_new (const gchar* hostname,
154
RemoteClient *self = REMOTE_CLIENT(g_object_new(remote_client_get_type(), NULL));
156
self->host = hostname;
162
void remote_client_set_status_cb (RemoteClient* self,
163
RemoteStatusCallback status_cb,
166
self->status_callback = status_cb;
167
self->status_callback_user_data = user_data;
170
void remote_client_set_speed_cb (RemoteClient* self,
171
RemoteSpeedCallback speed_cb,
174
self->speed_callback = speed_cb;
175
self->speed_callback_user_data = user_data;
178
void remote_client_connect (RemoteClient* self)
180
/* Clean up after the previous connection, if we need to */
182
gnet_conn_delete(self->gconn);
185
remote_client_empty_queue(self);
187
/* Reset our speed counters and rate limiting timers */
188
self->iter_accumulator = 0;
189
self->byte_accumulator = 0;
190
self->iters_per_sec = 0;
191
self->bytes_per_sec = 0;
192
g_timer_start(self->stream_request_timer);
193
g_timer_start(self->status_speed_timer);
194
g_timer_start(self->stream_speed_timer);
196
/* Create the new connection object */
197
self->gconn = gnet_conn_new(self->host, self->port, remote_client_callback, self);
198
gnet_conn_set_watch_error(self->gconn, TRUE);
199
gnet_conn_connect(self->gconn);
200
gnet_conn_readline(self->gconn);
202
remote_client_update_status(self, "Connecting...");
206
void remote_client_start_retry (RemoteClient* self)
208
remote_client_stop_retry(self);
209
if (self->is_retry_enabled)
210
self->retry_timer = g_timeout_add(self->retry_timeout * 1000,
211
remote_client_retry_callback,
216
void remote_client_stop_retry (RemoteClient* self)
218
if (self->retry_timer) {
219
g_source_remove(self->retry_timer);
220
self->retry_timer = 0;
225
gboolean remote_client_retry_callback(gpointer user_data)
227
RemoteClient* self = REMOTE_CLIENT(user_data);
228
self->retry_timer = 0;
229
remote_client_connect(self);
234
gboolean remote_client_is_ready (RemoteClient* self)
236
return self->is_ready;
240
/************************************************************************************/
241
/************************************************************** Low-level Interface */
242
/************************************************************************************/
244
static void remote_client_update_status (RemoteClient* self,
251
if (!self->status_callback)
255
msg = g_strdup_vprintf(fmt, ap);
258
self->status_callback(self, msg, self->status_callback_user_data);
263
void remote_client_command (RemoteClient* self,
264
RemoteCallback callback,
269
RemoteClosure *closure = g_new0(RemoteClosure, 1);
274
/* Add the response callback to our queue */
275
closure->callback = callback;
276
closure->user_data = user_data;
277
g_queue_push_head(self->response_queue, closure);
279
/* Assemble the caller's formatted string */
280
va_start(ap, format);
281
full_message = g_strdup_vprintf(format, ap);
284
/* Send a one-line command */
285
line = g_strdup_printf("%s\n", full_message);
286
gnet_conn_write(self->gconn, line, strlen(line));
287
g_free(full_message);
291
static void remote_client_callback (GConn* gconn,
295
RemoteClient* self = (RemoteClient*) user_data;
296
switch (event->type) {
299
if (self->current_binary_response)
300
remote_client_recv_binary(self, event);
302
remote_client_recv_line(self, event);
305
case GNET_CONN_CONNECT:
306
remote_client_update_status(self, "Connected");
309
case GNET_CONN_CLOSE:
310
self->is_ready = FALSE;
311
remote_client_update_status(self, "Connection closed");
312
remote_client_start_retry(self);
315
case GNET_CONN_TIMEOUT:
316
self->is_ready = FALSE;
317
remote_client_update_status(self, "Timed out");
318
remote_client_start_retry(self);
321
case GNET_CONN_ERROR:
322
self->is_ready = FALSE;
323
remote_client_update_status(self, "Connection error");
324
remote_client_start_retry(self);
332
static void remote_client_recv_binary (RemoteClient* self,
335
RemoteClosure* closure = g_queue_pop_tail(self->response_queue);
336
RemoteResponse* response = self->current_binary_response;
337
self->current_binary_response = NULL;
339
g_assert(closure != NULL);
341
response->data = event->buffer;
342
if (closure->callback)
343
closure->callback(self, response, closure->user_data);
346
g_free(response->message);
349
gnet_conn_readline(self->gconn);
352
static void remote_client_recv_line (RemoteClient* self,
355
RemoteResponse *response = g_new0(RemoteResponse, 1);
356
RemoteClosure *closure;
358
response->code = strtol(event->buffer, &response->message, 10);
359
if (*response->message)
361
response->message = g_strdup(response->message);
363
if (response->code == FYRE_RESPONSE_BINARY) {
364
/* Extract the length of the binary response, then start
365
* reading the binary data itself. Note that if we
366
* have a zero-length binary message, skip the next
367
* stage and fall through to processing a normal message.
369
response->data_length = strtol(response->message, NULL, 10);
371
if (response->data_length > 0) {
372
self->current_binary_response = response;
373
gnet_conn_readn(self->gconn, response->data_length);
378
/* We're done, signal the callback and start waiting
379
* for another normal response line.
381
closure = g_queue_pop_tail(self->response_queue);
384
/* This was an answer to some request. Invoke the callback
385
* if one was specified.
387
if (closure->callback)
388
closure->callback(self, response, closure->user_data);
392
/* This was unsolicited- should only occur for the server ready message */
393
if (response->code == FYRE_RESPONSE_READY) {
394
self->is_ready = TRUE;
395
remote_client_update_status(self, "Ready");
398
remote_client_update_status(self, "Protocol error");
402
g_free(response->message);
405
gnet_conn_readline(self->gconn);
409
/************************************************************************************/
410
/************************************************************* High-level Interface */
411
/************************************************************************************/
413
static void set_param_callback (RemoteClient* self,
414
RemoteResponse* response,
417
self->pending_param_changes--;
420
void remote_client_send_param (RemoteClient* self,
424
/* Serialize one parameter value, and send it to the server */
428
GParamSpec *spec = g_object_class_find_property(G_OBJECT_GET_CLASS(ph), name);
429
g_assert(spec != NULL);
431
memset(&val, 0, sizeof(val));
432
g_value_init(&val, spec->value_type);
433
g_object_get_property(G_OBJECT(ph), name, &val);
435
memset(&strval, 0, sizeof(strval));
436
g_value_init(&strval, G_TYPE_STRING);
437
g_value_transform(&val, &strval);
438
string = g_value_get_string(&strval);
440
/* 'string' will be NULL if the value couldn't be serialized- currently
441
* this happens for colors, since we get the GdkColor property rather than
442
* the corresponding string property. Currently this isn't a problem since
443
* render nodes don't deal with colors, but it's something to be aware of.
446
self->pending_param_changes++;
447
remote_client_command(self, set_param_callback, NULL, "set_param %s = %s",
451
g_value_unset(&strval);
455
void remote_client_send_all_params (RemoteClient* self,
458
/* Find all serializable parameters, and send them */
461
GParamSpec** properties;
464
properties = g_object_class_list_properties(G_OBJECT_GET_CLASS(ph), &n_properties);
466
for (i=0; i<n_properties; i++)
467
if (properties[i]->flags & PARAM_SERIALIZED)
468
remote_client_send_param(self, ph, properties[i]->name);
473
static void histogram_merge_callback (RemoteClient* self,
474
RemoteResponse* response,
477
HistogramImager *dest = HISTOGRAM_IMAGER(user_data);
480
self->pending_stream_requests--;
482
if (self->pending_param_changes) {
483
/* This data is for an old parameter set, ignore it.
484
* FIXME: This doesn't distinguish between parameters that
485
* actaully affect calculation and those that don't.
490
if (!response->data_length)
493
histogram_imager_merge_stream(dest, response->data, response->data_length);
495
/* Update our download speed */
496
self->byte_accumulator += response->data_length;
497
elapsed = g_timer_elapsed(self->stream_speed_timer, NULL);
498
if (elapsed > MINIMUM_SPEED_WINDOW) {
499
g_timer_start(self->stream_speed_timer);
500
self->bytes_per_sec = self->byte_accumulator / elapsed;
501
self->byte_accumulator = 0;
505
static void status_merge_callback (RemoteClient* self,
506
RemoteResponse* response,
509
IterativeMap *dest = ITERATIVE_MAP(user_data);
510
double iters, iter_delta;
514
sscanf(response->message, "iterations=%lf density=%ld", &iters, &density);
516
/* FIXME: Since we don't know which parameters affect calculation, we don't
517
* know when the node's iteration counter gets reset. We currently
518
* assume that if it's value decreases, it's been reset.
520
if (iters >= self->prev_iterations) {
521
iter_delta = iters - self->prev_iterations;
524
/* Assume it started at zero */
527
self->prev_iterations = iters;
529
if (self->pending_param_changes)
534
/* Merge this iteration count in */
535
dest->iterations += iter_delta;
537
/* Update our iteration speed */
538
self->iter_accumulator += iter_delta;
539
elapsed = g_timer_elapsed(self->status_speed_timer, NULL);
540
if (elapsed > MINIMUM_SPEED_WINDOW) {
541
g_timer_start(self->status_speed_timer);
542
self->iters_per_sec = self->iter_accumulator / elapsed;
543
self->iter_accumulator = 0;
544
if (self->speed_callback)
545
self->speed_callback(self, self->iters_per_sec, self->bytes_per_sec,
546
self->speed_callback_user_data);
550
void remote_client_merge_results (RemoteClient* self,
555
/* Don't let our stream requests get too backed up */
556
if (self->pending_stream_requests >= 4)
559
/* Always keep the status updated */
560
remote_client_command(self, status_merge_callback, dest,
563
/* Have we waited long enough since the last request? */
564
elapsed = g_timer_elapsed(self->stream_request_timer, NULL);
565
if (elapsed < self->min_stream_interval)
567
g_timer_start(self->stream_request_timer);
569
self->pending_stream_requests++;
570
remote_client_command(self, histogram_merge_callback, dest,
571
"get_histogram_stream");