[rrd-developers] src/rrd_client.c: Simplyfied parsing of responses.
Tobias Oetiker
tobi at oetiker.ch
Sun Sep 14 17:29:07 CEST 2008
Florian,
patch is in
tobi
Today Florian Forster wrote:
> From: Florian Forster <octo at leeloo.home.verplant.org>
>
> The previous code was broken: The response was read using `read(2)'. If
> the server wasn't sending fast enough, the client would stop reading
> before the entire message had been read.
>
> This patch changes the communication code to use the (line based)
> `fgets' function rather than the lower level `read' function. After
> reading the first line (which contains the total number of line to be
> expected), this precise number of lines is read - blocking if necessary.
>
> Also, the missing four new statistic values have been added to
> `rrdc_stats_get'.
> ---
> src/rrd_client.c | 443 ++++++++++++++++++++++--------------------------------
> 1 files changed, 177 insertions(+), 266 deletions(-)
>
> diff --git a/src/rrd_client.c b/src/rrd_client.c
> index d1ad5e0..11fb80d 100644
> --- a/src/rrd_client.c
> +++ b/src/rrd_client.c
> @@ -48,84 +48,28 @@ typedef struct rrdc_response_s rrdc_response_t;
>
> static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> static int sd = -1;
> +static FILE *sh = NULL;
> static char *sd_path = NULL; /* cache the path for sd */
> -static void _disconnect(void);
>
> -static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
> +/* One must hold `lock' when calling `close_connection'. */
> +static void close_connection (void) /* {{{ */
> {
> - char *buffer;
> - size_t buffer_used;
> - size_t buffer_free;
> - ssize_t status;
> -
> - buffer = (char *) buffer_void;
> - buffer_used = 0;
> - buffer_free = buffer_size;
> -
> - while (buffer_free > 0)
> + if (sh != NULL)
> {
> - status = read (sd, buffer + buffer_used, buffer_free);
> - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
> - continue;
> -
> - if (status < 0)
> - return (-1);
> -
> - if (status == 0)
> - {
> - _disconnect();
> - errno = EPROTO;
> - return (-1);
> - }
> -
> - assert ((0 > status) || (buffer_free >= (size_t) status));
> -
> - buffer_free -= status;
> - buffer_used += status;
> -
> - if (buffer[buffer_used - 1] == '\n')
> - break;
> + fclose (sh);
> + sh = NULL;
> + sd = -1;
> }
> -
> - if (buffer[buffer_used - 1] != '\n')
> + else if (sd >= 0)
> {
> - errno = ENOBUFS;
> - return (-1);
> + close (sd);
> + sd = -1;
> }
>
> - buffer[buffer_used - 1] = '\0';
> - return (buffer_used);
> -} /* }}} ssize_t sread */
> -
> -static ssize_t swrite (const void *buf, size_t count) /* {{{ */
> -{
> - const char *ptr;
> - size_t nleft;
> - ssize_t status;
> -
> - ptr = (const char *) buf;
> - nleft = count;
> -
> - while (nleft > 0)
> - {
> - status = write (sd, (const void *) ptr, nleft);
> -
> - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
> - continue;
> -
> - if (status < 0)
> - {
> - _disconnect();
> - rrd_set_error("lost connection to rrdcached");
> - return (status);
> - }
> -
> - nleft -= status;
> - ptr += status;
> - }
> -
> - return (0);
> -} /* }}} ssize_t swrite */
> + if (sd_path != NULL)
> + free (sd_path);
> + sd_path = NULL;
> +} /* }}} void close_connection */
>
> static int buffer_add_string (const char *str, /* {{{ */
> char **buffer_ret, size_t *buffer_size_ret)
> @@ -192,103 +136,151 @@ static int buffer_add_value (const char *value, /* {{{ */
> return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
> } /* }}} int buffer_add_value */
>
> -static int response_parse (char *buffer, size_t buffer_size, /* {{{ */
> - rrdc_response_t **ret_response)
> +/* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
> + * the Perl function `chomp'. Returns the number of characters that have been
> + * removed. */
> +static int chomp (char *str) /* {{{ */
> {
> - rrdc_response_t *ret;
> + size_t len;
> + int removed;
> +
> + if (str == NULL)
> + return (-1);
> +
> + len = strlen (str);
> + removed = 0;
> + while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
> + {
> + str[len - 1] = 0;
> + len--;
> + removed++;
> + }
> +
> + return (removed);
> +} /* }}} int chomp */
> +
> +static void response_free (rrdc_response_t *res) /* {{{ */
> +{
> + if (res == NULL)
> + return;
> +
> + if (res->lines != NULL)
> + {
> + size_t i;
> +
> + for (i = 0; i < res->lines_num; i++)
> + if (res->lines[i] != NULL)
> + free (res->lines[i]);
> + free (res->lines);
> + }
> +
> + free (res);
> +} /* }}} void response_free */
>
> - char *dummy;
> - char *saveptr;
> +static int response_read (rrdc_response_t **ret_response) /* {{{ */
> +{
> + rrdc_response_t *ret;
>
> - char *line_ptr;
> - size_t line_counter;
> + char buffer[4096];
> + char *buffer_ptr;
>
> - if (buffer == NULL)
> - return (EINVAL);
> - if (buffer_size <= 0)
> - return (EINVAL);
> + size_t i;
>
> - if (buffer[buffer_size - 1] != 0)
> + if (sh == NULL)
> return (-1);
>
> ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
> if (ret == NULL)
> - return (ENOMEM);
> + return (-2);
> memset (ret, 0, sizeof (*ret));
> + ret->lines = NULL;
> + ret->lines_num = 0;
> +
> + buffer_ptr = fgets (buffer, sizeof (buffer), sh);
> + if (buffer_ptr == NULL)
> + return (-3);
> + chomp (buffer);
> +
> + ret->status = strtol (buffer, &ret->message, 0);
> + if (buffer == ret->message)
> + {
> + response_free (ret);
> + return (-4);
> + }
> + /* Skip leading whitespace of the status message */
> + ret->message += strspn (ret->message, " \t");
>
> - line_counter = 0;
> + if (ret->status <= 0)
> + {
> + *ret_response = ret;
> + return (0);
> + }
>
> - dummy = buffer;
> - saveptr = NULL;
> - while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL)
> + ret->lines = (char **) malloc (sizeof (char *) * ret->status);
> + if (ret->lines == NULL)
> {
> - dummy = NULL;
> + response_free (ret);
> + return (-5);
> + }
> + memset (ret->lines, 0, sizeof (char *) * ret->status);
> + ret->lines_num = (size_t) ret->status;
>
> - if (ret->message == NULL)
> + for (i = 0; i < ret->lines_num; i++)
> + {
> + buffer_ptr = fgets (buffer, sizeof (buffer), sh);
> + if (buffer_ptr == NULL)
> {
> - ret->status = strtol (buffer, &ret->message, 0);
> - if (buffer == ret->message)
> - {
> - free (ret);
> - return (EPROTO);
> - }
> -
> - /* Skip leading whitespace of the status message */
> - ret->message += strspn (ret->message, " \t");
> -
> - if (ret->status > 0)
> - {
> - ret->lines = (char **) malloc (sizeof (char *) * ret->status);
> - if (ret->lines == NULL)
> - {
> - free (ret);
> - return (ENOMEM);
> - }
> - memset (ret->lines, 0, sizeof (char *) * ret->status);
> - ret->lines_num = (size_t) ret->status;
> - }
> - else
> - {
> - ret->lines = NULL;
> - ret->lines_num = 0;
> - }
> + response_free (ret);
> + return (-6);
> }
> - else /* if (ret->message != NULL) */
> + chomp (buffer);
> +
> + ret->lines[i] = strdup (buffer);
> + if (ret->lines[i] == NULL)
> {
> - if (line_counter < ret->lines_num)
> - ret->lines[line_counter] = line_ptr;
> - line_counter++;
> + response_free (ret);
> + return (-7);
> }
> - } /* while (strtok_r) */
> -
> - if (ret->lines_num != line_counter)
> - {
> - errno = EPROTO;
> - if (ret->lines != NULL)
> - free (ret->lines);
> - free (ret);
> - return (-1);
> }
>
> *ret_response = ret;
> return (0);
> -} /* }}} int response_parse */
> +} /* }}} rrdc_response_t *response_read */
>
> -static void response_free (rrdc_response_t *res) /* {{{ */
> +static int request (const char *buffer, size_t buffer_size, /* {{{ */
> + rrdc_response_t **ret_response)
> {
> - if (res == NULL)
> - return;
> + int status;
> + rrdc_response_t *res;
>
> - if (res->lines != NULL)
> + pthread_mutex_lock (&lock);
> +
> + if (sh == NULL)
> {
> - res->lines_num = 0;
> - free (res->lines);
> - res->lines = NULL;
> + pthread_mutex_unlock (&lock);
> + return (ENOTCONN);
> }
>
> - free (res);
> -} /* }}} void response_free */
> + status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
> + if (status != 1)
> + {
> + close_connection ();
> + pthread_mutex_unlock (&lock);
> + return (-1);
> + }
> + fflush (sh);
>
> + res = NULL;
> + status = response_read (&res);
> +
> + pthread_mutex_unlock (&lock);
> +
> + if (status != 0)
> + return (status);
> +
> + *ret_response = res;
> + return (0);
> +} /* }}} int request */
>
> /* determine whether we are connected to the specified daemon_addr if
> * NULL, return whether we are connected at all
> @@ -340,6 +332,15 @@ static int rrdc_connect_unix (const char *path) /* {{{ */
> if (status != 0)
> {
> status = errno;
> + close_connection ();
> + return (status);
> + }
> +
> + sh = fdopen (sd, "r+");
> + if (sh == NULL)
> + {
> + status = errno;
> + close_connection ();
> return (status);
> }
>
> @@ -383,7 +384,15 @@ static int rrdc_connect_network (const char *addr) /* {{{ */
> if (status != 0)
> {
> status = errno;
> - _disconnect();
> + close_connection();
> + continue;
> + }
> +
> + sh = fdopen (sd, "r+");
> + if (sh == NULL)
> + {
> + status = errno;
> + close_connection ();
> continue;
> }
>
> @@ -414,7 +423,7 @@ int rrdc_connect (const char *addr) /* {{{ */
> }
> else
> {
> - _disconnect();
> + close_connection();
> }
>
> if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
> @@ -436,23 +445,11 @@ int rrdc_connect (const char *addr) /* {{{ */
> return (status);
> } /* }}} int rrdc_connect */
>
> -static void _disconnect(void) /* {{{ */
> -{
> - if (sd >= 0)
> - close(sd);
> -
> - if (sd_path != NULL)
> - free(sd_path);
> -
> - sd = -1;
> - sd_path = NULL;
> -} /* }}} static void _disconnect(void) */
> -
> int rrdc_disconnect (void) /* {{{ */
> {
> pthread_mutex_lock (&lock);
>
> - _disconnect();
> + close_connection();
>
> pthread_mutex_unlock (&lock);
>
> @@ -466,6 +463,7 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */
> char *buffer_ptr;
> size_t buffer_free;
> size_t buffer_size;
> + rrdc_response_t *res;
> int status;
> int i;
>
> @@ -493,37 +491,14 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */
> assert (buffer[buffer_size - 1] == ' ');
> buffer[buffer_size - 1] = '\n';
>
> - pthread_mutex_lock (&lock);
> -
> - if (sd < 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENOTCONN);
> - }
> -
> - status = swrite (buffer, buffer_size);
> + res = NULL;
> + status = request (buffer, buffer_size, &res);
> if (status != 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (status);
> - }
> -
> - status = sread (buffer, sizeof (buffer));
> - if (status < 0)
> - {
> - status = errno;
> - pthread_mutex_unlock (&lock);
> return (status);
> - }
> - else if (status == 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENODATA);
> - }
>
> - pthread_mutex_unlock (&lock);
> + status = res->status;
> + response_free (res);
>
> - status = atoi (buffer);
> return (status);
> } /* }}} int rrdc_update */
>
> @@ -533,6 +508,7 @@ int rrdc_flush (const char *filename) /* {{{ */
> char *buffer_ptr;
> size_t buffer_free;
> size_t buffer_size;
> + rrdc_response_t *res;
> int status;
>
> if (filename == NULL)
> @@ -555,42 +531,18 @@ int rrdc_flush (const char *filename) /* {{{ */
> assert (buffer[buffer_size - 1] == ' ');
> buffer[buffer_size - 1] = '\n';
>
> - pthread_mutex_lock (&lock);
> -
> - if (sd < 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENOTCONN);
> - }
> -
> - status = swrite (buffer, buffer_size);
> + res = NULL;
> + status = request (buffer, buffer_size, &res);
> if (status != 0)
> - {
> - pthread_mutex_unlock (&lock);
> return (status);
> - }
>
> - status = sread (buffer, sizeof (buffer));
> - if (status < 0)
> - {
> - status = errno;
> - pthread_mutex_unlock (&lock);
> - return (status);
> - }
> - else if (status == 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENODATA);
> - }
> -
> - pthread_mutex_unlock (&lock);
> + status = res->status;
> + response_free (res);
>
> - status = atoi (buffer);
> return (status);
> } /* }}} int rrdc_flush */
>
>
> -
> /* convenience function; if there is a daemon specified, or if we can
> * detect one from the environment, then flush the file. Otherwise, no-op
> */
> @@ -619,21 +571,11 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
> rrdc_stats_t *head;
> rrdc_stats_t *tail;
>
> - rrdc_response_t *response;
> + rrdc_response_t *res;
>
> - char buffer[4096];
> - size_t buffer_size;
> int status;
> size_t i;
>
> - pthread_mutex_lock (&lock);
> -
> - if (sd < 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENOTCONN);
> - }
> -
> /* Protocol example: {{{
> * -> STATS
> * <- 5 Statistics follow
> @@ -643,63 +585,28 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
> * <- TreeNodesNumber: 0
> * <- TreeDepth: 0
> * }}} */
> - status = swrite ("STATS\n", strlen ("STATS\n"));
> - if (status != 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (status);
> - }
> -
> - status = sread (buffer, sizeof (buffer));
> - if (status < 0)
> - {
> - status = errno;
> - pthread_mutex_unlock (&lock);
> - return (status);
> - }
> - else if (status == 0)
> - {
> - pthread_mutex_unlock (&lock);
> - return (ENODATA);
> - }
> -
> - pthread_mutex_unlock (&lock);
> -
> - /* Assert NULL termination */
> - buffer_size = (size_t) status;
> - if (buffer[buffer_size - 1] != 0)
> - {
> - if (buffer_size < sizeof (buffer))
> - {
> - buffer[buffer_size] = 0;
> - buffer_size++;
> - }
> - else
> - {
> - return (ENOBUFS);
> - }
> - }
>
> - status = response_parse (buffer, buffer_size, &response);
> + res = NULL;
> + status = request ("STATS\n", strlen ("STATS\n"), &res);
> if (status != 0)
> return (status);
>
> - if (response->status <= 0)
> + if (res->status <= 0)
> {
> - response_free (response);
> + response_free (res);
> return (EIO);
> }
>
> head = NULL;
> tail = NULL;
> - for (i = 0; i < response->lines_num; i++)
> + for (i = 0; i < res->lines_num; i++)
> {
> char *key;
> char *value;
> char *endptr;
> rrdc_stats_t *s;
>
> - key = response->lines[i];
> + key = res->lines[i];
> value = strchr (key, ':');
> if (value == NULL)
> continue;
> @@ -718,14 +625,18 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
>
> endptr = NULL;
> if ((strcmp ("QueueLength", key) == 0)
> - || (strcmp ("TreeNodesNumber", key) == 0)
> - || (strcmp ("TreeDepth", key) == 0))
> + || (strcmp ("TreeDepth", key) == 0)
> + || (strcmp ("TreeNodesNumber", key) == 0))
> {
> s->type = RRDC_STATS_TYPE_GAUGE;
> s->value.gauge = strtod (value, &endptr);
> }
> - else if ((strcmp ("UpdatesWritten", key) == 0)
> - || (strcmp ("DataSetsWritten", key) == 0))
> + else if ((strcmp ("DataSetsWritten", key) == 0)
> + || (strcmp ("FlushesReceived", key) == 0)
> + || (strcmp ("JournalBytes", key) == 0)
> + || (strcmp ("JournalRotate", key) == 0)
> + || (strcmp ("UpdatesReceived", key) == 0)
> + || (strcmp ("UpdatesWritten", key) == 0))
> {
> s->type = RRDC_STATS_TYPE_COUNTER;
> s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
> @@ -754,9 +665,9 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
> tail->next = s;
> tail = s;
> }
> - } /* for (i = 0; i < response->lines_num; i++) */
> + } /* for (i = 0; i < res->lines_num; i++) */
>
> - response_free (response);
> + response_free (res);
>
> if (head == NULL)
> return (EPROTO);
>
--
Tobi Oetiker, OETIKER+PARTNER AG, Aarweg 15 CH-4600 Olten, Switzerland
http://it.oetiker.ch tobi at oetiker.ch ++41 62 775 9902 / sb: -9900
More information about the rrd-developers
mailing list