[rrd-developers] [PATCH] rrdcached "SLURP" for extreme update rates

kevin brintnall kbrint at rufus.net
Wed Oct 1 23:37:43 CEST 2008


This patch introduces a new way to feed updates to 'rrdcached'.  This
feature is designed for sites with an *EXTREMELY* high update rate.

A client on a high-privilge control socket may issue the following
command:

  ->  SLURP /somewhere/filename
  <-  0 Processed 6840502 lines, 6840502 entries, 0 failures

This causes the daemon to read all the UPDATEs in the file and apply them
just as if they had been sent from the control socket that send the
"SLURP" command.  Essentially, this is on-demand journal replay.

The existing methods for updating rrdcached (via the rrd_update API) are
synchronous by necessity; they have to wait for the server return the
result code so they can pass the result up to the user.  Unfortunately,
this requires one read() and one write() on the client and the server PER
UPDATE.  These read()/write() are usually VERY small, and end up driving
up the CPU utilization on both client and daemon.

With "SLURP", the files can be both written (client) and read (rrdcached)
using buffered I/O.  The larger read() and write() enable a MUCH faster
update rate with nearly zero CPU utilization.

Note that it's not possible for the user to know the status of an
individual update, since only a single status code is returned to the
user.

At this time, I do not think it's necessary to have hooks in the main RRD
API to interface with "SLURP".  Tobi, I'm open to suggestion if you think
otherwise.

Update rates on commodity hardware (P4 3GHz, FreeBSD 7), comparing SLURP
performance vs. UPDATE (via rrd_update):

  138763 updates/second = SLURP  (slurped files in tempfs)
   23298 updates/second = UPDATE (UNIX domain socket)
   12155 udpates/second = UPDATE (TCP unix socket)

---
diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod
index e0571a4..9b26984 100644
--- a/doc/rrdcached.pod
+++ b/doc/rrdcached.pod
@@ -400,6 +400,22 @@ Adds more data to a filename. This is B<the> operation the daemon was designed
 for, so describing the mechanism again is unnecessary. Read L<HOW IT WORKS>
 above for a detailed explanation.
 
+=item B<SLURP> I<filename>
+
+This causes the daemon to read and perform all commands in I<filename>.
+The commands must appear one per line just like they would be sent over a
+control socket (or written to the journal).
+
+This is a very efficient way of passing a large number of commands to the
+daemon, since the file can be both written and read with
+bufferedE<nbsp>I/O.  This reduces the number of read/write calls necessary
+to transfer a large number of B<UPDATES> to the daemon.
+
+If B<-B> is specified, the file must be underneath the base directory.
+Refer to the documentation for B<-B> and B<-b> for more info.
+
+The file should contain only B<UPDATE> and B<FLUSH> commands!
+
 =item B<WROTE> I<filename>
 
 This command is written to the journal after a file is successfully
diff --git a/src/rrd_client.c b/src/rrd_client.c
index 76fded0..0e29690 100644
--- a/src/rrd_client.c
+++ b/src/rrd_client.c
@@ -688,6 +688,7 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
         || (strcmp ("JournalBytes", key) == 0)
         || (strcmp ("JournalRotate", key) == 0)
         || (strcmp ("UpdatesReceived", key) == 0)
+        || (strcmp ("SlurpReceived", key) == 0)
         || (strcmp ("UpdatesWritten", key) == 0))
     {
       s->type = RRDC_STATS_TYPE_COUNTER;
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
index 604aee3..f457f76 100644
--- a/src/rrd_daemon.c
+++ b/src/rrd_daemon.c
@@ -189,6 +189,7 @@ static int config_listen_address_list_len = 0;
 static uint64_t stats_queue_length = 0;
 static uint64_t stats_updates_received = 0;
 static uint64_t stats_flush_received = 0;
+static uint64_t stats_slurp_received = 0;
 static uint64_t stats_updates_written = 0;
 static uint64_t stats_data_sets_written = 0;
 static uint64_t stats_journal_bytes = 0;
@@ -200,9 +201,13 @@ static char *journal_cur = NULL;
 static char *journal_old = NULL;
 static FILE *journal_fh = NULL;
 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
+static int journal_init_done = 0; /* are we done re-playing old journals? */
+static void journal_init(void);
 static int journal_write(char *cmd, char *args);
 static void journal_done(void);
 static void journal_rotate(void);
+static int journal_replay (const char *file, int log_prio,
+                           int *entry_cnt, int *fail_cnt);
 
 /* 
  * Functions
@@ -986,6 +991,7 @@ static int handle_request_stats (int fd, /* {{{ */
   uint64_t copy_queue_length;
   uint64_t copy_updates_received;
   uint64_t copy_flush_received;
+  uint64_t copy_slurp_received;
   uint64_t copy_updates_written;
   uint64_t copy_data_sets_written;
   uint64_t copy_journal_bytes;
@@ -998,6 +1004,7 @@ static int handle_request_stats (int fd, /* {{{ */
   copy_queue_length       = stats_queue_length;
   copy_updates_received   = stats_updates_received;
   copy_flush_received     = stats_flush_received;
+  copy_slurp_received     = stats_slurp_received;
   copy_updates_written    = stats_updates_written;
   copy_data_sets_written  = stats_data_sets_written;
   copy_journal_bytes      = stats_journal_bytes;
@@ -1019,7 +1026,7 @@ static int handle_request_stats (int fd, /* {{{ */
     return (status); \
   }
 
-  strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
+  strncpy (outbuf, "10 Statistics follow\n", sizeof (outbuf));
   RRDD_STATS_SEND;
 
   snprintf (outbuf, sizeof (outbuf),
@@ -1034,6 +1041,10 @@ static int handle_request_stats (int fd, /* {{{ */
       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
   RRDD_STATS_SEND;
 
+  snprintf (outbuf, sizeof(outbuf),
+      "SlurpReceived: %"PRIu64"\n", copy_slurp_received);
+  RRDD_STATS_SEND;
+
   snprintf (outbuf, sizeof (outbuf),
       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
   RRDD_STATS_SEND;
@@ -1135,6 +1146,56 @@ static int handle_request_flushall(int fd) /* {{{ */
   return (status);
 } /* }}} static int handle_request_flushall */
 
+static int handle_request_slurp (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int status;
+  char result[CMD_MAX];
+
+  int lines     = 0;
+  int entry_cnt = 0;
+  int fail_cnt  = 0;
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    strncpy (result, "-1 Usage: slurp <filename>\n", sizeof (result));
+    return swrite(fd, result, strlen(result));
+  }
+
+  pthread_mutex_lock(&stats_lock);
+  stats_slurp_received++;
+  pthread_mutex_unlock(&stats_lock);
+
+  if (!check_file_access(file, fd)) return 0;
+
+  errno = 0;
+  lines = journal_replay(file, LOG_DEBUG, &entry_cnt, &fail_cnt);
+
+  if (lines >= 0)
+  {
+    /* return an error if NONE of the lines were good */
+    status = entry_cnt > 0 ? 0 : -1;
+    snprintf(result, sizeof(result)-1,
+             "%d Processed %d lines, %d entries, %d failures\n",
+             status, lines, entry_cnt, fail_cnt);
+  }
+  else
+    snprintf(result, sizeof(result)-1, "-1 %s",
+             errno > 0 ? rrd_strerror(errno) : "Internal error");
+
+  status = swrite (fd, result, strlen (result));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_slurp: swrite returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_flush */
+
 static int handle_request_update (int fd, /* {{{ */
     char *buffer, size_t buffer_size)
 {
@@ -1377,15 +1438,14 @@ static int handle_request (int fd, socket_privilege privilege, /* {{{ */
     if (status <= 0)
       return status;
 
-    /* don't re-write updates in replay mode */
-    if (fd >= 0)
-      journal_write(command, buffer_ptr);
+    journal_write(command, buffer_ptr);
 
     return (handle_request_update (fd, buffer_ptr, buffer_size));
   }
-  else if (strcasecmp (command, "wrote") == 0 && fd < 0)
+  else if (strcasecmp (command, "wrote") == 0 &&
+           !journal_init_done && fd < 0) /* initial replay mode */
   {
-    /* this is only valid in replay mode */
+    /* this is only valid in initial replay mode (not slurp) */
     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
   }
   else if (strcasecmp (command, "flush") == 0)
@@ -1400,6 +1460,19 @@ static int handle_request (int fd, socket_privilege privilege, /* {{{ */
 
     return (handle_request_flushall(fd));
   }
+  else if (strcasecmp (command, "slurp") == 0 && fd >= 0)
+  {
+    /* note, we restrict to only "real" fd; we won't accept
+     * a "slurp" from when we're already doing journal replay
+     * due to the possibility of a loop!!
+     */
+
+    status = has_privilege(privilege, PRIV_HIGH, fd);
+    if (status <= 0)
+      return status;
+
+    return (handle_request_slurp(fd, buffer_ptr, buffer_size));
+  }
   else if (strcasecmp (command, "stats") == 0)
   {
     return (handle_request_stats (fd, buffer_ptr, buffer_size));
@@ -1502,6 +1575,10 @@ static int journal_write(char *cmd, char *args) /* {{{ */
   if (journal_fh == NULL)
     return 0;
 
+  /* don't re-write updates when we're replaying old journals */
+  if (!journal_init_done)
+    return 0;
+
   pthread_mutex_lock(&journal_lock);
   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
   pthread_mutex_unlock(&journal_lock);
@@ -1516,11 +1593,11 @@ static int journal_write(char *cmd, char *args) /* {{{ */
   return chars;
 } /* }}} static int journal_write */
 
-static int journal_replay (const char *file) /* {{{ */
+/* returns number of lines processed */
+static int journal_replay (const char *file, int log_prio, /* {{{ */
+                           int *entry_cnt, int *fail_cnt)
 {
   FILE *fh;
-  int entry_cnt = 0;
-  int fail_cnt = 0;
   uint64_t line = 0;
   char entry[CMD_MAX];
 
@@ -1535,15 +1612,12 @@ static int journal_replay (const char *file) /* {{{ */
     return 0;
   }
   else
-    RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
+    RRDD_LOG(log_prio, "replaying from journal: %s", file);
 
-  while(!feof(fh))
+  while (fgets(entry, sizeof(entry), fh) != NULL)
   {
     size_t entry_len;
-
     ++line;
-    if (fgets(entry, sizeof(entry), fh) == NULL)
-      break;
     entry_len = strlen(entry);
 
     /* check \n termination in case journal writing crashed mid-line */
@@ -1551,31 +1625,55 @@ static int journal_replay (const char *file) /* {{{ */
       continue;
     else if (entry[entry_len - 1] != '\n')
     {
-      RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
-      ++fail_cnt;
+      RRDD_LOG(log_prio,
+               "Malformed journal entry in '%s' line %"PRIu64, file, line);
+      (*fail_cnt)++;
       continue;
     }
 
     entry[entry_len - 1] = '\0';
 
     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
-      ++entry_cnt;
+      (*entry_cnt)++;
     else
-      ++fail_cnt;
+      (*fail_cnt)++;
   }
 
   fclose(fh);
 
-  if (entry_cnt > 0)
+  if (line > 0)
+    RRDD_LOG(log_prio, "Replayed %d entries (%d failures) from '%s'",
+             *entry_cnt, *fail_cnt, file);
+
+  return line;
+} /* }}} static int journal_replay */
+
+static void journal_init(void) /* {{{ */
+{
+  if (journal_cur != NULL)
   {
-    RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
-             entry_cnt, fail_cnt);
-    return 1;
+    int entry_cnt = 0;
+    int fail_cnt  = 0;
+    int lines     = 0;
+
+    pthread_mutex_lock(&journal_lock);
+
+    RRDD_LOG(LOG_INFO, "checking for journal files");
+
+    lines += journal_replay(journal_old, LOG_NOTICE, &entry_cnt, &fail_cnt);
+    lines += journal_replay(journal_cur, LOG_NOTICE, &entry_cnt, &fail_cnt);
+
+    if (entry_cnt > 0)
+      flush_old_values(-1);
+
+    pthread_mutex_unlock(&journal_lock);
+    journal_rotate();
+
+    RRDD_LOG(LOG_INFO, "journal processing complete");
   }
-  else
-    return 0;
 
-} /* }}} static int journal_replay */
+  journal_init_done = 1;
+} /* }}} static void journal_init */
 
 static void *connection_thread_main (void *args) /* {{{ */
 {
@@ -2359,25 +2457,7 @@ int main (int argc, char **argv)
     return (1);
   }
 
-  if (journal_cur != NULL)
-  {
-    int had_journal = 0;
-
-    pthread_mutex_lock(&journal_lock);
-
-    RRDD_LOG(LOG_INFO, "checking for journal files");
-
-    had_journal += journal_replay(journal_old);
-    had_journal += journal_replay(journal_cur);
-
-    if (had_journal)
-      flush_old_values(-1);
-
-    pthread_mutex_unlock(&journal_lock);
-    journal_rotate();
-
-    RRDD_LOG(LOG_INFO, "journal processing complete");
-  }
+  journal_init();
 
   /* start the queue thread */
   memset (&queue_thread, 0, sizeof (queue_thread));



More information about the rrd-developers mailing list