[rrd-developers] src/rrd_client.c: Simplyfied parsing of responses.

Tobias Oetiker tobi at oetiker.ch
Sun Sep 14 17:29:07 CEST 2008


Florian,

patch is in
tobi

Today Florian Forster wrote:

> From: Florian Forster <octo at leeloo.home.verplant.org>
>
> The previous code was broken: The response was read using `read(2)'. If
> the server wasn't sending fast enough, the client would stop reading
> before the entire message had been read.
>
> This patch changes the communication code to use the (line based)
> `fgets' function rather than the lower level `read' function. After
> reading the first line (which contains the total number of line to be
> expected), this precise number of lines is read - blocking if necessary.
>
> Also, the missing four new statistic values have been added to
> `rrdc_stats_get'.
> ---
>  src/rrd_client.c |  443 ++++++++++++++++++++++--------------------------------
>  1 files changed, 177 insertions(+), 266 deletions(-)
>
> diff --git a/src/rrd_client.c b/src/rrd_client.c
> index d1ad5e0..11fb80d 100644
> --- a/src/rrd_client.c
> +++ b/src/rrd_client.c
> @@ -48,84 +48,28 @@ typedef struct rrdc_response_s rrdc_response_t;
>
>  static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>  static int sd = -1;
> +static FILE *sh = NULL;
>  static char *sd_path = NULL; /* cache the path for sd */
> -static void _disconnect(void);
>
> -static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
> +/* One must hold `lock' when calling `close_connection'. */
> +static void close_connection (void) /* {{{ */
>  {
> -  char    *buffer;
> -  size_t   buffer_used;
> -  size_t   buffer_free;
> -  ssize_t  status;
> -
> -  buffer       = (char *) buffer_void;
> -  buffer_used  = 0;
> -  buffer_free  = buffer_size;
> -
> -  while (buffer_free > 0)
> +  if (sh != NULL)
>    {
> -    status = read (sd, buffer + buffer_used, buffer_free);
> -    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
> -      continue;
> -
> -    if (status < 0)
> -      return (-1);
> -
> -    if (status == 0)
> -    {
> -      _disconnect();
> -      errno = EPROTO;
> -      return (-1);
> -    }
> -
> -    assert ((0 > status) || (buffer_free >= (size_t) status));
> -
> -    buffer_free -= status;
> -    buffer_used += status;
> -
> -    if (buffer[buffer_used - 1] == '\n')
> -      break;
> +    fclose (sh);
> +    sh = NULL;
> +    sd = -1;
>    }
> -
> -  if (buffer[buffer_used - 1] != '\n')
> +  else if (sd >= 0)
>    {
> -    errno = ENOBUFS;
> -    return (-1);
> +    close (sd);
> +    sd = -1;
>    }
>
> -  buffer[buffer_used - 1] = '\0';
> -  return (buffer_used);
> -} /* }}} ssize_t sread */
> -
> -static ssize_t swrite (const void *buf, size_t count) /* {{{ */
> -{
> -  const char *ptr;
> -  size_t      nleft;
> -  ssize_t     status;
> -
> -  ptr   = (const char *) buf;
> -  nleft = count;
> -
> -  while (nleft > 0)
> -  {
> -    status = write (sd, (const void *) ptr, nleft);
> -
> -    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
> -      continue;
> -
> -    if (status < 0)
> -    {
> -      _disconnect();
> -      rrd_set_error("lost connection to rrdcached");
> -      return (status);
> -    }
> -
> -    nleft -= status;
> -    ptr   += status;
> -  }
> -
> -  return (0);
> -} /* }}} ssize_t swrite */
> +  if (sd_path != NULL)
> +    free (sd_path);
> +  sd_path = NULL;
> +} /* }}} void close_connection */
>
>  static int buffer_add_string (const char *str, /* {{{ */
>      char **buffer_ret, size_t *buffer_size_ret)
> @@ -192,103 +136,151 @@ static int buffer_add_value (const char *value, /* {{{ */
>    return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
>  } /* }}} int buffer_add_value */
>
> -static int response_parse (char *buffer, size_t buffer_size, /* {{{ */
> -    rrdc_response_t **ret_response)
> +/* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
> + * the Perl function `chomp'. Returns the number of characters that have been
> + * removed. */
> +static int chomp (char *str) /* {{{ */
>  {
> -  rrdc_response_t *ret;
> +  size_t len;
> +  int removed;
> +
> +  if (str == NULL)
> +    return (-1);
> +
> +  len = strlen (str);
> +  removed = 0;
> +  while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
> +  {
> +    str[len - 1] = 0;
> +    len--;
> +    removed++;
> +  }
> +
> +  return (removed);
> +} /* }}} int chomp */
> +
> +static void response_free (rrdc_response_t *res) /* {{{ */
> +{
> +  if (res == NULL)
> +    return;
> +
> +  if (res->lines != NULL)
> +  {
> +    size_t i;
> +
> +    for (i = 0; i < res->lines_num; i++)
> +      if (res->lines[i] != NULL)
> +        free (res->lines[i]);
> +    free (res->lines);
> +  }
> +
> +  free (res);
> +} /* }}} void response_free */
>
> -  char *dummy;
> -  char *saveptr;
> +static int response_read (rrdc_response_t **ret_response) /* {{{ */
> +{
> +  rrdc_response_t *ret;
>
> -  char *line_ptr;
> -  size_t line_counter;
> +  char buffer[4096];
> +  char *buffer_ptr;
>
> -  if (buffer == NULL)
> -    return (EINVAL);
> -  if (buffer_size <= 0)
> -    return (EINVAL);
> +  size_t i;
>
> -  if (buffer[buffer_size - 1] != 0)
> +  if (sh == NULL)
>      return (-1);
>
>    ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
>    if (ret == NULL)
> -    return (ENOMEM);
> +    return (-2);
>    memset (ret, 0, sizeof (*ret));
> +  ret->lines = NULL;
> +  ret->lines_num = 0;
> +
> +  buffer_ptr = fgets (buffer, sizeof (buffer), sh);
> +  if (buffer_ptr == NULL)
> +    return (-3);
> +  chomp (buffer);
> +
> +  ret->status = strtol (buffer, &ret->message, 0);
> +  if (buffer == ret->message)
> +  {
> +    response_free (ret);
> +    return (-4);
> +  }
> +  /* Skip leading whitespace of the status message */
> +  ret->message += strspn (ret->message, " \t");
>
> -  line_counter = 0;
> +  if (ret->status <= 0)
> +  {
> +    *ret_response = ret;
> +    return (0);
> +  }
>
> -  dummy = buffer;
> -  saveptr = NULL;
> -  while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL)
> +  ret->lines = (char **) malloc (sizeof (char *) * ret->status);
> +  if (ret->lines == NULL)
>    {
> -    dummy = NULL;
> +    response_free (ret);
> +    return (-5);
> +  }
> +  memset (ret->lines, 0, sizeof (char *) * ret->status);
> +  ret->lines_num = (size_t) ret->status;
>
> -    if (ret->message == NULL)
> +  for (i = 0; i < ret->lines_num; i++)
> +  {
> +    buffer_ptr = fgets (buffer, sizeof (buffer), sh);
> +    if (buffer_ptr == NULL)
>      {
> -      ret->status = strtol (buffer, &ret->message, 0);
> -      if (buffer == ret->message)
> -      {
> -        free (ret);
> -        return (EPROTO);
> -      }
> -
> -      /* Skip leading whitespace of the status message */
> -      ret->message += strspn (ret->message, " \t");
> -
> -      if (ret->status > 0)
> -      {
> -        ret->lines = (char **) malloc (sizeof (char *) * ret->status);
> -        if (ret->lines == NULL)
> -        {
> -          free (ret);
> -          return (ENOMEM);
> -        }
> -        memset (ret->lines, 0, sizeof (char *) * ret->status);
> -        ret->lines_num = (size_t) ret->status;
> -      }
> -      else
> -      {
> -        ret->lines = NULL;
> -        ret->lines_num = 0;
> -      }
> +      response_free (ret);
> +      return (-6);
>      }
> -    else /* if (ret->message != NULL) */
> +    chomp (buffer);
> +
> +    ret->lines[i] = strdup (buffer);
> +    if (ret->lines[i] == NULL)
>      {
> -      if (line_counter < ret->lines_num)
> -        ret->lines[line_counter] = line_ptr;
> -      line_counter++;
> +      response_free (ret);
> +      return (-7);
>      }
> -  } /* while (strtok_r) */
> -
> -  if (ret->lines_num != line_counter)
> -  {
> -    errno = EPROTO;
> -    if (ret->lines != NULL)
> -      free (ret->lines);
> -    free (ret);
> -    return (-1);
>    }
>
>    *ret_response = ret;
>    return (0);
> -} /* }}} int response_parse */
> +} /* }}} rrdc_response_t *response_read */
>
> -static void response_free (rrdc_response_t *res) /* {{{ */
> +static int request (const char *buffer, size_t buffer_size, /* {{{ */
> +    rrdc_response_t **ret_response)
>  {
> -  if (res == NULL)
> -    return;
> +  int status;
> +  rrdc_response_t *res;
>
> -  if (res->lines != NULL)
> +  pthread_mutex_lock (&lock);
> +
> +  if (sh == NULL)
>    {
> -    res->lines_num = 0;
> -    free (res->lines);
> -    res->lines = NULL;
> +    pthread_mutex_unlock (&lock);
> +    return (ENOTCONN);
>    }
>
> -  free (res);
> -} /* }}} void response_free */
> +  status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
> +  if (status != 1)
> +  {
> +    close_connection ();
> +    pthread_mutex_unlock (&lock);
> +    return (-1);
> +  }
> +  fflush (sh);
>
> +  res = NULL;
> +  status = response_read (&res);
> +
> +  pthread_mutex_unlock (&lock);
> +
> +  if (status != 0)
> +    return (status);
> +
> +  *ret_response = res;
> +  return (0);
> +} /* }}} int request */
>
>  /* determine whether we are connected to the specified daemon_addr if
>   * NULL, return whether we are connected at all
> @@ -340,6 +332,15 @@ static int rrdc_connect_unix (const char *path) /* {{{ */
>    if (status != 0)
>    {
>      status = errno;
> +    close_connection ();
> +    return (status);
> +  }
> +
> +  sh = fdopen (sd, "r+");
> +  if (sh == NULL)
> +  {
> +    status = errno;
> +    close_connection ();
>      return (status);
>    }
>
> @@ -383,7 +384,15 @@ static int rrdc_connect_network (const char *addr) /* {{{ */
>      if (status != 0)
>      {
>        status = errno;
> -      _disconnect();
> +      close_connection();
> +      continue;
> +    }
> +
> +    sh = fdopen (sd, "r+");
> +    if (sh == NULL)
> +    {
> +      status = errno;
> +      close_connection ();
>        continue;
>      }
>
> @@ -414,7 +423,7 @@ int rrdc_connect (const char *addr) /* {{{ */
>    }
>    else
>    {
> -    _disconnect();
> +    close_connection();
>    }
>
>    if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
> @@ -436,23 +445,11 @@ int rrdc_connect (const char *addr) /* {{{ */
>    return (status);
>  } /* }}} int rrdc_connect */
>
> -static void _disconnect(void) /* {{{ */
> -{
> -  if (sd >= 0)
> -    close(sd);
> -
> -  if (sd_path != NULL)
> -    free(sd_path);
> -
> -  sd = -1;
> -  sd_path = NULL;
> -} /* }}} static void _disconnect(void) */
> -
>  int rrdc_disconnect (void) /* {{{ */
>  {
>    pthread_mutex_lock (&lock);
>
> -  _disconnect();
> +  close_connection();
>
>    pthread_mutex_unlock (&lock);
>
> @@ -466,6 +463,7 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */
>    char *buffer_ptr;
>    size_t buffer_free;
>    size_t buffer_size;
> +  rrdc_response_t *res;
>    int status;
>    int i;
>
> @@ -493,37 +491,14 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */
>    assert (buffer[buffer_size - 1] == ' ');
>    buffer[buffer_size - 1] = '\n';
>
> -  pthread_mutex_lock (&lock);
> -
> -  if (sd < 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENOTCONN);
> -  }
> -
> -  status = swrite (buffer, buffer_size);
> +  res = NULL;
> +  status = request (buffer, buffer_size, &res);
>    if (status != 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (status);
> -  }
> -
> -  status = sread (buffer, sizeof (buffer));
> -  if (status < 0)
> -  {
> -    status = errno;
> -    pthread_mutex_unlock (&lock);
>      return (status);
> -  }
> -  else if (status == 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENODATA);
> -  }
>
> -  pthread_mutex_unlock (&lock);
> +  status = res->status;
> +  response_free (res);
>
> -  status = atoi (buffer);
>    return (status);
>  } /* }}} int rrdc_update */
>
> @@ -533,6 +508,7 @@ int rrdc_flush (const char *filename) /* {{{ */
>    char *buffer_ptr;
>    size_t buffer_free;
>    size_t buffer_size;
> +  rrdc_response_t *res;
>    int status;
>
>    if (filename == NULL)
> @@ -555,42 +531,18 @@ int rrdc_flush (const char *filename) /* {{{ */
>    assert (buffer[buffer_size - 1] == ' ');
>    buffer[buffer_size - 1] = '\n';
>
> -  pthread_mutex_lock (&lock);
> -
> -  if (sd < 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENOTCONN);
> -  }
> -
> -  status = swrite (buffer, buffer_size);
> +  res = NULL;
> +  status = request (buffer, buffer_size, &res);
>    if (status != 0)
> -  {
> -    pthread_mutex_unlock (&lock);
>      return (status);
> -  }
>
> -  status = sread (buffer, sizeof (buffer));
> -  if (status < 0)
> -  {
> -    status = errno;
> -    pthread_mutex_unlock (&lock);
> -    return (status);
> -  }
> -  else if (status == 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENODATA);
> -  }
> -
> -  pthread_mutex_unlock (&lock);
> +  status = res->status;
> +  response_free (res);
>
> -  status = atoi (buffer);
>    return (status);
>  } /* }}} int rrdc_flush */
>
>
> -
>  /* convenience function; if there is a daemon specified, or if we can
>   * detect one from the environment, then flush the file.  Otherwise, no-op
>   */
> @@ -619,21 +571,11 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
>    rrdc_stats_t *head;
>    rrdc_stats_t *tail;
>
> -  rrdc_response_t *response;
> +  rrdc_response_t *res;
>
> -  char buffer[4096];
> -  size_t buffer_size;
>    int status;
>    size_t i;
>
> -  pthread_mutex_lock (&lock);
> -
> -  if (sd < 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENOTCONN);
> -  }
> -
>    /* Protocol example: {{{
>     * ->  STATS
>     * <-  5 Statistics follow
> @@ -643,63 +585,28 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
>     * <-  TreeNodesNumber: 0
>     * <-  TreeDepth: 0
>     * }}} */
> -  status = swrite ("STATS\n", strlen ("STATS\n"));
> -  if (status != 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (status);
> -  }
> -
> -  status = sread (buffer, sizeof (buffer));
> -  if (status < 0)
> -  {
> -    status = errno;
> -    pthread_mutex_unlock (&lock);
> -    return (status);
> -  }
> -  else if (status == 0)
> -  {
> -    pthread_mutex_unlock (&lock);
> -    return (ENODATA);
> -  }
> -
> -  pthread_mutex_unlock (&lock);
> -
> -  /* Assert NULL termination */
> -  buffer_size = (size_t) status;
> -  if (buffer[buffer_size - 1] != 0)
> -  {
> -    if (buffer_size < sizeof (buffer))
> -    {
> -      buffer[buffer_size] = 0;
> -      buffer_size++;
> -    }
> -    else
> -    {
> -      return (ENOBUFS);
> -    }
> -  }
>
> -  status = response_parse (buffer, buffer_size, &response);
> +  res = NULL;
> +  status = request ("STATS\n", strlen ("STATS\n"), &res);
>    if (status != 0)
>      return (status);
>
> -  if (response->status <= 0)
> +  if (res->status <= 0)
>    {
> -    response_free (response);
> +    response_free (res);
>      return (EIO);
>    }
>
>    head = NULL;
>    tail = NULL;
> -  for (i = 0; i < response->lines_num; i++)
> +  for (i = 0; i < res->lines_num; i++)
>    {
>      char *key;
>      char *value;
>      char *endptr;
>      rrdc_stats_t *s;
>
> -    key = response->lines[i];
> +    key = res->lines[i];
>      value = strchr (key, ':');
>      if (value == NULL)
>        continue;
> @@ -718,14 +625,18 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
>
>      endptr = NULL;
>      if ((strcmp ("QueueLength", key) == 0)
> -        || (strcmp ("TreeNodesNumber", key) == 0)
> -        || (strcmp ("TreeDepth", key) == 0))
> +        || (strcmp ("TreeDepth", key) == 0)
> +        || (strcmp ("TreeNodesNumber", key) == 0))
>      {
>        s->type = RRDC_STATS_TYPE_GAUGE;
>        s->value.gauge = strtod (value, &endptr);
>      }
> -    else if ((strcmp ("UpdatesWritten", key) == 0)
> -        || (strcmp ("DataSetsWritten", key) == 0))
> +    else if ((strcmp ("DataSetsWritten", key) == 0)
> +        || (strcmp ("FlushesReceived", key) == 0)
> +        || (strcmp ("JournalBytes", key) == 0)
> +        || (strcmp ("JournalRotate", key) == 0)
> +        || (strcmp ("UpdatesReceived", key) == 0)
> +        || (strcmp ("UpdatesWritten", key) == 0))
>      {
>        s->type = RRDC_STATS_TYPE_COUNTER;
>        s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
> @@ -754,9 +665,9 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
>        tail->next = s;
>        tail = s;
>      }
> -  } /* for (i = 0; i < response->lines_num; i++) */
> +  } /* for (i = 0; i < res->lines_num; i++) */
>
> -  response_free (response);
> +  response_free (res);
>
>    if (head == NULL)
>      return (EPROTO);
>

-- 
Tobi Oetiker, OETIKER+PARTNER AG, Aarweg 15 CH-4600 Olten, Switzerland
http://it.oetiker.ch tobi at oetiker.ch ++41 62 775 9902 / sb: -9900



More information about the rrd-developers mailing list