[Babel-users] [PATCH 2/3] Introduce taskqueue for easier tasks and timeout handling

Christof Schulze christof.schulze at gmx.net
Wed Dec 5 23:11:30 GMT 2018


The task queue allows to schedule tasks that happen in the future. This
introduces an abstraction layer for scheduling operations that will allow
to adjust the data structure or task handling and reduce the complexity of
the main loop
---
  Makefile    |   4 +-
  babeld.c    |  12 +++
  taskqueue.c | 247 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
  taskqueue.h |  40 ++++++++++
  util.h      |   1 +
  5 files changed, 302 insertions(+), 2 deletions(-)
  create mode 100644 taskqueue.c
  create mode 100644 taskqueue.h

diff --git a/Makefile b/Makefile
index 9b98eb8..65407c0 100644
--- a/Makefile
+++ b/Makefile
@@ -11,11 +11,11 @@ LDLIBS = -lrt
  
  SRCS = babeld.c net.c kernel.c util.c interface.c source.c neighbour.c \
         route.c xroute.c message.c resend.c configuration.c local.c \
-       disambiguation.c rule.c
+       disambiguation.c rule.c taskqueue.c
  
  OBJS = babeld.o net.o kernel.o util.o interface.o source.o neighbour.o \
         route.o xroute.o message.o resend.o configuration.o local.o \
-       disambiguation.o rule.o
+       disambiguation.o rule.o taskqueue.o
  
  babeld: $(OBJS)
  	$(CC) $(CFLAGS) $(LDFLAGS) -o babeld $(OBJS) $(LDLIBS)
diff --git a/babeld.c b/babeld.c
index 08923f5..b7d078f 100644
--- a/babeld.c
+++ b/babeld.c
@@ -55,6 +55,7 @@ THE SOFTWARE.
  #include "rule.h"
  #include "version.h"
  #include "error.h"
+#include "taskqueue.h"
  
  struct timeval now;
  
@@ -92,6 +93,8 @@ static int kernel_rules_changed = 0;
  static int kernel_link_changed = 0;
  static int kernel_addr_changed = 0;
  
+taskqueue_ctx queue_ctx = {};
+
  struct timeval check_neighbours_timeout, check_interfaces_timeout;
  
  static volatile sig_atomic_t exiting = 0, dumping = 0, reopening = 0;
@@ -576,6 +579,8 @@ main(int argc, char **argv)
          flushbuf(&ifp->buf);
      }
  
+    taskqueue_init(&queue_ctx);
+
      debugf("Entering main loop.\n");
  
      while(1) {
@@ -618,6 +623,10 @@ main(int argc, char **argv)
                  FD_SET(local_server_socket, &readfds);
                  maxfd = MAX(maxfd, local_server_socket);
              }
+
+            FD_SET(queue_ctx.fd, &readfds);
+            maxfd = MAX(maxfd, queue_ctx.fd);
+
              for(i = 0; i < num_local_sockets; i++) {
                  FD_SET(local_sockets[i].fd, &readfds);
                  maxfd = MAX(maxfd, local_sockets[i].fd);
@@ -671,6 +680,9 @@ main(int argc, char **argv)
              }
          }
  
+	if (FD_ISSET(queue_ctx.fd, &readfds))
+	    taskqueue_run(&queue_ctx);
+
          if(local_server_socket >= 0 && FD_ISSET(local_server_socket, &readfds))
             accept_local_connections();
  
diff --git a/taskqueue.c b/taskqueue.c
new file mode 100644
index 0000000..44992b2
--- /dev/null
+++ b/taskqueue.c
@@ -0,0 +1,247 @@
+/*
+  Copyright (c) 2012-2016, Matthias Schiffer <mschiffer at universe-factory.net>
+  Copyright (c) 2016, Nils Schneider <nils at nilsschneider.net>
+  Copyright (c) 2017-2018, Christof Schulze <christof at christofschulze.com>
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions are met:
+
+    1. Redistributions of source code must retain the above copyright notice,
+       this list of conditions and the following disclaimer.
+    2. Redistributions in binary form must reproduce the above copyright
+  notice,
+       this list of conditions and the following disclaimer in the
+  documentation
+       and/or other materials provided with the distribution.
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+  ARE
+  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+
+#include "taskqueue.h"
+#include "util.h"
+#include "kernel.h"
+#include "error.h"
+
+void
+taskqueue_init(taskqueue_ctx *ctx)
+{
+    ctx->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+    ctx->queue = NULL;
+}
+
+/** this will add timeout seconds and millisecs milliseconds to the current
+ * time to calculate at which time a task should run given an offset */
+struct timeval
+settime(unsigned int timeout, unsigned int millisecs)
+{
+    struct timeval due;
+    struct timeval dest;
+    gettime(&due);
+
+    due.tv_sec += timeout;
+
+    timeval_add_msec(&dest, &due, millisecs);
+    return dest;
+}
+
+/** Enqueues a new task. A task with a timeout of zero is scheduled
+ * immediately.
+ */
+taskqueue_t *
+post_task(taskqueue_ctx *ctx, unsigned int timeout, unsigned int millisecs,
+          void (*function)(void *), void (*cleanup)(void *), void *data)
+{
+    taskqueue_t *task = calloc(1, sizeof(taskqueue_t));
+    task->children = task->next = NULL;
+    task->pprev = NULL;
+
+    task->due = settime(timeout, millisecs);
+
+    task->function = function;
+    task->cleanup = cleanup;
+    task->data = data;
+    taskqueue_insert(&ctx->queue, task);
+    taskqueue_schedule(ctx);
+
+    return task;
+}
+
+/** Changes the timeout of a task.
+  */
+bool
+reschedule_task(taskqueue_ctx *ctx, taskqueue_t *task, unsigned int timeout,
+                unsigned int millisecs)
+{
+    if(task == NULL || !taskqueue_linked(task)) return false;
+
+    struct timeval due = settime(timeout, millisecs);
+
+    if(timeval_compare(&due, &task->due)) {
+        task->due = due;
+        taskqueue_remove(task);
+        taskqueue_insert(&ctx->queue, task);
+        taskqueue_schedule(ctx);
+    }
+
+    return true;
+}
+
+void
+taskqueue_schedule(taskqueue_ctx *ctx)
+{
+    if(ctx->queue == NULL) return;
+
+    struct itimerspec t = {
+	    .it_value.tv_sec = ctx->queue->due.tv_sec,
+            .it_value.tv_nsec = ctx->queue->due.tv_usec * 1000
+    };
+
+    timerfd_settime(ctx->fd, TFD_TIMER_ABSTIME, &t, NULL);
+}
+
+void
+taskqueue_run(taskqueue_ctx *ctx)
+{
+    debugf("handling taskqueue event\n");
+    unsigned long long nEvents;
+
+    struct timeval now;
+    gettime(&now);
+
+    read(ctx->fd, &nEvents, sizeof(nEvents));
+
+    if(ctx->queue == NULL) return;
+
+    taskqueue_t *task = ctx->queue;
+
+    if(timeval_compare(&task->due, &now) <= 0) {
+        taskqueue_remove(task);
+        task->function(task->data);
+
+        if(task->cleanup != NULL) task->cleanup(task->data);
+
+        free(task);
+    }
+
+    taskqueue_schedule(ctx);
+}
+
+/** Links an element at the position specified by \e queue */
+static inline void
+taskqueue_link(taskqueue_t **queue, taskqueue_t *elem)
+{
+    if(elem->next) exit_bug("taskqueue_link: element already linked");
+
+    elem->pprev = queue;
+    elem->next = *queue;
+    if(elem->next) elem->next->pprev = &elem->next;
+
+    *queue = elem;
+}
+
+/** Unlinks an element */
+static inline void
+taskqueue_unlink(taskqueue_t *elem)
+{
+    *elem->pprev = elem->next;
+    if(elem->next) elem->next->pprev = elem->pprev;
+
+    elem->next = NULL;
+}
+
+/**
+   Merges two priority queues
+
+   \e queue2 may be empty (NULL)
+*/
+static taskqueue_t *
+taskqueue_merge(taskqueue_t *queue1, taskqueue_t *queue2)
+{
+    if(!queue1) exit_bug("taskqueue_merge: queue1 unset");
+    if(queue1->next) exit_bug("taskqueue_merge: queue2 has successor");
+    if(!queue2) return queue1;
+    if(queue2->next) exit_bug("taskqueue_merge: queue2 has successor");
+
+    taskqueue_t *lo, *hi;
+
+    if(timeval_compare(&queue1->due, &queue2->due) < 0) {
+        lo = queue1;
+        hi = queue2;
+    } else {
+        lo = queue2;
+        hi = queue1;
+    }
+
+    taskqueue_link(&lo->children, hi);
+
+    return lo;
+}
+
+/** Merges a list of priority queues */
+static taskqueue_t *
+taskqueue_merge_pairs(taskqueue_t *queue0)
+{
+    if(!queue0) return NULL;
+
+    if(!queue0->pprev) exit_bug("taskqueue_merge_pairs: unlinked queue");
+
+    taskqueue_t *queue1 = queue0->next;
+
+    if(!queue1) return queue0;
+
+    taskqueue_t *queue2 = queue1->next;
+
+    queue0->next = queue1->next = NULL;
+
+    return taskqueue_merge(taskqueue_merge(queue0, queue1),
+                           taskqueue_merge_pairs(queue2));
+}
+
+/** Inserts a new element into a priority queue */
+void
+taskqueue_insert(taskqueue_t **queue, taskqueue_t *elem)
+{
+    if(elem->pprev || elem->next || elem->children)
+        exit_bug("taskqueue_insert: tried to insert linked queue element");
+
+    *queue = taskqueue_merge(elem, *queue);
+    (*queue)->pprev = queue;
+}
+
+/** Removes an element from a priority queue */
+void
+taskqueue_remove(taskqueue_t *elem)
+{
+    if(!taskqueue_linked(elem)) {
+        if(elem->children || elem->next)
+            exit_bug("taskqueue_remove: corrupted queue item");
+
+        return;
+    }
+
+    taskqueue_t **pprev = elem->pprev;
+
+    taskqueue_unlink(elem);
+
+    taskqueue_t *merged = taskqueue_merge_pairs(elem->children);
+    if(merged) taskqueue_link(pprev, merged);
+
+    elem->pprev = NULL;
+    elem->children = NULL;
+}
diff --git a/taskqueue.h b/taskqueue.h
new file mode 100644
index 0000000..a733ad8
--- /dev/null
+++ b/taskqueue.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <stdbool.h>
+
+typedef struct taskqueue taskqueue_t;
+
+typedef struct {
+	taskqueue_t *queue;
+	int fd;
+} taskqueue_ctx;
+
+/** Element of a priority queue */
+struct taskqueue {
+	taskqueue_t **pprev; /**< \e next element of the previous element (or \e
+				children of the parent) */
+	taskqueue_t *next;   /**< Next sibling in the heap */
+
+	taskqueue_t *children; /**< Heap children */
+
+	struct timeval due; /**< The priority */
+
+	void (*function)(void *);
+	void (*cleanup)(void *);
+	void *data;
+};
+
+/** Checks if an element is currently part of a priority queue */
+static inline bool taskqueue_linked(taskqueue_t *elem) { return elem->pprev; }
+
+void taskqueue_insert(taskqueue_t **queue, taskqueue_t *elem);
+void taskqueue_remove(taskqueue_t *elem);
+
+void taskqueue_init(taskqueue_ctx *ctx);
+void taskqueue_run(taskqueue_ctx *ctx);
+void taskqueue_schedule(taskqueue_ctx *ctx);
+taskqueue_t *post_task(taskqueue_ctx *ctx, unsigned int timeout,
+		       unsigned int millisecs, void (*function)(void *),
+		       void (*cleanup)(void *), void *data);
+bool reschedule_task(taskqueue_ctx *ctx, taskqueue_t *task,
+		     unsigned int timeout, unsigned int millisecs);
diff --git a/util.h b/util.h
index 4a80510..51b7ca2 100644
--- a/util.h
+++ b/util.h
@@ -19,6 +19,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  THE SOFTWARE.
  */
+#include "babeld.h"
  
  #define DO_NTOHS(_d, _s) \
      do { unsigned short _dd; \
-- 
2.11.0




More information about the Babel-users mailing list