[rrd-developers] [PATCH] rrdcached: use double-linked list for queue structure
kevin brintnall
kbrint at rufus.net
Tue Sep 30 20:44:57 CEST 2008
Now, moving a value to the head of the queue is O(1). Before it was
O(queue size). This improves performance of individual flushes when
there is a large number of files in the queue. As a result, we don't
hold the cache_lock as much.
Revamped enqueue_cache_item to take advantage of the new structure.
Renamed _wipe_ci_values to look nicer with other code.
---
diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod
index 98376a1..a30f293 100644
--- a/doc/rrdcached.pod
+++ b/doc/rrdcached.pod
@@ -224,7 +224,8 @@ to disk.
+---+----+---+ +------+-----+ +---+----+---+
! File: foo ! ! File: bar ! ! File: qux !
! First: 101 ! ! First: 119 ! ! First: 180 !
- ! Next: ---+--->! Next: ---+---> ... --->! Next: - !
+ ! Next:&bar -+--->! Next:&... -+---> ... --->! Next:NULL !
+ | Prev:NULL !<---+-Prev:&foo !<--- ... ----+-Prev: &... !
+============+ +============+ +============+
! Time: 100 ! ! Time: 120 ! ! Time: 180 !
! Value: 10 ! ! Value: 0.1 ! ! Value: 2,2 !
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
index e869a86..604aee3 100644
--- a/src/rrd_daemon.c
+++ b/src/rrd_daemon.c
@@ -128,6 +128,7 @@ struct cache_item_s
#define CI_FLAGS_IN_QUEUE (1<<1)
int flags;
pthread_cond_t flushed;
+ cache_item_t *prev;
cache_item_t *next;
};
@@ -402,7 +403,7 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
return (0);
} /* }}} ssize_t swrite */
-static void _wipe_ci_values(cache_item_t *ci, time_t when)
+static void wipe_ci_values(cache_item_t *ci, time_t when)
{
ci->values = NULL;
ci->values_num = 0;
@@ -410,10 +411,30 @@ static void _wipe_ci_values(cache_item_t *ci, time_t when)
ci->last_flush_time = when;
if (config_write_jitter > 0)
ci->last_flush_time += (random() % config_write_jitter);
-
- ci->flags &= ~(CI_FLAGS_IN_QUEUE);
}
+/* remove_from_queue
+ * remove a "cache_item_t" item from the queue.
+ * must hold 'cache_lock' when calling this
+ */
+static void remove_from_queue(cache_item_t *ci) /* {{{ */
+{
+ if (ci == NULL) return;
+
+ if (ci->prev == NULL)
+ cache_queue_head = ci->next; /* reset head */
+ else
+ ci->prev->next = ci->next;
+
+ if (ci->next == NULL)
+ cache_queue_tail = ci->prev; /* reset the tail */
+ else
+ ci->next->prev = ci->prev;
+
+ ci->next = ci->prev = NULL;
+ ci->flags &= ~CI_FLAGS_IN_QUEUE;
+} /* }}} static void remove_from_queue */
+
/*
* enqueue_cache_item:
* `cache_lock' must be acquired before calling this function!
@@ -421,8 +442,6 @@ static void _wipe_ci_values(cache_item_t *ci, time_t when)
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
- int did_insert = 0;
-
if (ci == NULL)
return (-1);
@@ -431,67 +450,47 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
if (side == HEAD)
{
- if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
- {
- assert (ci->next == NULL);
- ci->next = cache_queue_head;
- cache_queue_head = ci;
-
- if (cache_queue_tail == NULL)
- cache_queue_tail = cache_queue_head;
+ if (cache_queue_head == ci)
+ return 0;
- did_insert = 1;
- }
- else if (cache_queue_head == ci)
- {
- /* do nothing */
- }
- else /* enqueued, but not first entry */
- {
- cache_item_t *prev;
+ /* remove from the double linked list */
+ if (ci->flags & CI_FLAGS_IN_QUEUE)
+ remove_from_queue(ci);
- /* find previous entry */
- for (prev = cache_queue_head; prev != NULL; prev = prev->next)
- if (prev->next == ci)
- break;
- assert (prev != NULL);
+ ci->prev = NULL;
+ ci->next = cache_queue_head;
+ if (ci->next != NULL)
+ ci->next->prev = ci;
+ cache_queue_head = ci;
- /* move to the front */
- prev->next = ci->next;
- ci->next = cache_queue_head;
- cache_queue_head = ci;
-
- /* check if we need to adapt the tail */
- if (cache_queue_tail == ci)
- cache_queue_tail = prev;
- }
+ if (cache_queue_tail == NULL)
+ cache_queue_tail = cache_queue_head;
}
else /* (side == TAIL) */
{
/* We don't move values back in the list.. */
- if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+ if (ci->flags & CI_FLAGS_IN_QUEUE)
return (0);
assert (ci->next == NULL);
+ assert (ci->prev == NULL);
+
+ ci->prev = cache_queue_tail;
if (cache_queue_tail == NULL)
cache_queue_head = ci;
else
cache_queue_tail->next = ci;
- cache_queue_tail = ci;
- did_insert = 1;
+ cache_queue_tail = ci;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
- if (did_insert)
- {
- pthread_cond_broadcast(&cache_cond);
- pthread_mutex_lock (&stats_lock);
- stats_queue_length++;
- pthread_mutex_unlock (&stats_lock);
- }
+ pthread_cond_broadcast(&cache_cond);
+ pthread_mutex_lock (&stats_lock);
+ stats_queue_length++;
+ pthread_mutex_unlock (&stats_lock);
return (0);
} /* }}} int enqueue_cache_item */
@@ -684,12 +683,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
values = ci->values;
values_num = ci->values_num;
- _wipe_ci_values(ci, time(NULL));
-
- cache_queue_head = ci->next;
- if (cache_queue_head == NULL)
- cache_queue_tail = NULL;
- ci->next = NULL;
+ wipe_ci_values(ci, time(NULL));
+ remove_from_queue(ci);
pthread_mutex_lock (&stats_lock);
assert (stats_queue_length > 0);
@@ -1240,7 +1235,7 @@ static int handle_request_update (int fd, /* {{{ */
return (0);
}
- _wipe_ci_values(ci, now);
+ wipe_ci_values(ci, now);
ci->flags = CI_FLAGS_IN_TREE;
pthread_mutex_lock(&cache_lock);
@@ -1331,7 +1326,8 @@ static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
free(ci->values);
}
- _wipe_ci_values(ci, time(NULL));
+ wipe_ci_values(ci, time(NULL));
+ remove_from_queue(ci);
pthread_mutex_unlock(&cache_lock);
return (0);
More information about the rrd-developers
mailing list