[Git][java-team/trapperkeeper-scheduler-clojure][upstream] New upstream version 1.1.3

Louis-Philippe Véronneau gitlab at salsa.debian.org
Wed Dec 30 19:00:24 GMT 2020



Louis-Philippe Véronneau pushed to branch upstream at Debian Java Maintainers / trapperkeeper-scheduler-clojure


Commits:
3dacb894 by Louis-Philippe Véronneau at 2020-12-19T14:41:38-05:00
New upstream version 1.1.3
- - - - -


17 changed files:

- .gitignore
- .travis.yml
- CHANGELOG.md
- + CODEOWNERS
- + Makefile
- README.md
- + dev-resources/Makefile.i18n
- ext/travisci/test.sh
- + locales/eo.po
- + locales/trapperkeeper-scheduler.pot
- project.clj
- src/puppetlabs/trapperkeeper/services/protocols/scheduler.clj
- + src/puppetlabs/trapperkeeper/services/scheduler/job.clj
- src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
- src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
- test/integration/puppetlabs/trapperkeeper/services/scheduler/scheduler_service_test.clj
- − test/unit/puppetlabs/trapperkeeper/services/scheduler/scheduler_core_test.clj


Changes:

=====================================
.gitignore
=====================================
@@ -10,3 +10,6 @@ pom.xml.asc
 .lein-plugins/
 .lein-failures
 .nrepl-port
+/resources/locales.clj
+/dev-resources/i18n/bin
+/resources/**/Messages*.class


=====================================
.travis.yml
=====================================
@@ -1,8 +1,8 @@
 language: clojure
-lein: lein2
+lein: 2.9.1
 jdk:
-- oraclejdk7
-- openjdk7
+- openjdk8
+- openjdk11
 script: ./ext/travisci/test.sh
 notifications:
   email: false


=====================================
CHANGELOG.md
=====================================
@@ -1,3 +1,26 @@
+## 1.1.3
+ * ensure a non-nil function is passed to scheduling routines
+ 
+## 1.1.2
+ * add testing for java 11, disambiguate StdScheduleFactory constructor
+
+## 1.1.1
+ * use a unique scheduler rather than the default scheduler
+
+## 1.1.0
+ * add interface for `interval` and `interval-after` to the
+ protocol and implementation to allow regular cadance jobs.
+ * add support for thread-count configuration value that defaults to 10
+
+## 1.0.1
+ * exclude c3p0 from dependencies, it isn't used.
+
+## 1.0.0
+ * switch from at/at to the Quartz schedule library
+ * update clj-parent and drop support for java 7
+ * reimplement the group-id mapping using quartz internals
+ * add interface for listing the exiting job identifiers
+
 ## 0.1.0
  * Add the concept of group-id to the job creation endpoints to allow
  jobs to be grouped together for listing and cancellation.


=====================================
CODEOWNERS
=====================================
@@ -0,0 +1,4 @@
+# This will cause the puppetserver-maintainers group to be assigned
+# review of any opened PRs against the branches containing this file.
+
+* @puppetlabs/puppetserver-maintainers


=====================================
Makefile
=====================================
@@ -0,0 +1 @@
+include dev-resources/Makefile.i18n


=====================================
README.md
=====================================
@@ -29,6 +29,22 @@ The functions that are currently available are as follows:
   Returns an identifier that can be used to reference this scheduled job (e.g.,
   for cancellation) later. A group identifier `group-id` can be provided that
   allows jobs in the same group to be stopped at the same time.
+* `interval [interval-ms f]`: schedules a job that will call `f`, block until
+  that call completes, and then run again at the next logical interval based on
+  `interval-ms` and the original start time.  In other words, `f` will get called every
+  `interval-ms` unless the execution time for `f` exceeds `interval-ms` in which case
+  that execution is skipped.
+  Returns an identifier that can be used to reference this scheduled job (e.g.,
+  for cancellation) later.
+* `interval [interval-ms f group-id]`: schedules a job that will call `f`, block until
+  that call completes, and then run again at the next logical interval based on
+  `interval-ms` and the original start time.  In other words, `f` will get called every
+  `interval-ms` unless the execution time for `f` exceeds `interval-ms` in which case
+  that execution is skipped. If there are insufficient threads in the thread pool to
+  run the interval job at the time of execution, the job will be skipped. Returns an
+  identifier that can be used to reference this scheduled job (e.g.,
+  for cancellation) later. A group identifier `group-id` can be provided that
+  allows jobs in the same group to be stopped at the same time.
 * `after [interval-ms f]`: schedules a job that will call `f` a single time, after
   a delay of `interval-ms` milliseconds.  Returns an identifier that can be used
   to reference this scheduled job (e.g. for cancellation) later.
@@ -37,6 +53,11 @@ The functions that are currently available are as follows:
   to reference this scheduled job (e.g. for cancellation) later. A group identifier
   `group-id` can be provided that allows jobs in the same group to be stopped
   at the same time.
+* `interval-after [initial-delay-ms interval-ms f]`: Similar to `interval` but delays
+  initial execution until `initial-delay-ms` has occurred.
+* `interval-after [initial-delay-ms interval-ms f group-id]`:Similar to `interval` but delays
+  initial execution until `initial-delay-ms` has occurred A group identifier `group-id` can be provided that
+  allows jobs in the same group to be stopped at the same time.
 * `stop-job [job-id]`: Given a `job-id` returned by one of the previous functions,
   cancels the job.  If the job is currently executing it will be allowed to complete,
   but will not be invoked again afterward.  Returns `true` if the job was successfully
@@ -53,32 +74,23 @@ The functions that are currently available are as follows:
 * `count-jobs [group-id]`: Return a count of the total number of scheduled jobs
   with the associated `group-id` known to to the scheduling service.
   `after` jobs that have completed won't be included in the total.
+* `get-jobs []`: return a list of the current job identifiers
+* `get-jobs [group-id]`: return a list of the current job identifiers associated
+  with the specified group identifier
 
 ### Implementation Details
 
-The current implementation of the `SchedulerService` is a fairly thin wrapper around
-the [`overtone/at-at`](https://github.com/overtone/at-at) library.  This approach
-was chosen for a couple of reasons:
-
-* A few existing PL projects already use this library, so we thought it'd be better
-  not to introduce a different scheduling subsystem until all of the PL TK projects
-  are aligned to use the SchedulerService.
-* The `at-at` API seems like a pretty reasonable Clojure API for scheduling tasks.
+A configuration value is available under scheduler->thread-count that controls
+the number of threads used internally by the quartz library for job scheduling.
+If not specified, it defaults to 10, which is the quartz internal default.
 
-It would probably be a good idea to switch the implementation out to use a different
-backend in the future; without having done too much investigation yet, I'd be
-interested in looking into Quartz/Quartzite, simply because Quartz is very widely-used
-and battle-tested in the Java world.  `at-at` does not appear to be being maintained
-any longer.  We've had a few minor issues with it (especially w/rt shutting down
-the system), and haven't gotten any responses from the maintainer on the github
-issues we've opened.  Also, the source repo for `at-at` contains no tests :(
+The current implementation of the `SchedulerService` is a wrapper around
+the [`org.quartz-scheduler/quartz`](http://www.quartz-scheduler.org/) library.
 
 ### What's Next?
 
-* Add additional scheduling API functions, e.g. `every`.
+* Add additional scheduling API functions with more complicated recurring models.`.
 * Add API for introspecting the state of currently scheduled jobs
-* Consider porting the backend to something more sophisticated (and maintained)
-  than `at-at`; if we do this, the intent would be to maintain the existing API.
 
 #Support
 


=====================================
dev-resources/Makefile.i18n
=====================================
@@ -0,0 +1,136 @@
+# -*- Makefile -*-
+# This file was generated by the i18n leiningen plugin
+# Do not edit this file; it will be overwritten the next time you run
+#   lein i18n init
+#
+
+# The name of the package into which the translations bundle will be placed
+BUNDLE=puppetlabs.trapperkeeper_scheduler
+
+# The name of the POT file into which the gettext code strings (msgid) will be placed
+POT_NAME=trapperkeeper-scheduler.pot
+
+# The list of names of packages covered by the translation bundle;
+# by default it contains a single package - the same where the translations
+# bundle itself is placed - but this can be overridden - preferably in
+# the top level Makefile
+PACKAGES?=$(BUNDLE)
+LOCALES=$(basename $(notdir $(wildcard locales/*.po)))
+BUNDLE_DIR=$(subst .,/,$(BUNDLE))
+BUNDLE_FILES=$(patsubst %,resources/$(BUNDLE_DIR)/Messages_%.class,$(LOCALES))
+FIND_SOURCES=find src -name \*.clj
+# xgettext before 0.19 does not understand --add-location=file. Even CentOS
+# 7 ships with an older gettext. We will therefore generate full location
+# info on those systems, and only file names where xgettext supports it
+LOC_OPT=$(shell xgettext --add-location=file -f - </dev/null >/dev/null 2>&1 && echo --add-location=file || echo --add-location)
+
+LOCALES_CLJ=resources/locales.clj
+define LOCALES_CLJ_CONTENTS
+{
+  :locales  #{$(patsubst %,"%",$(LOCALES))}
+  :packages [$(patsubst %,"%",$(PACKAGES))]
+  :bundle   $(patsubst %,"%",$(BUNDLE).Messages)
+}
+endef
+export LOCALES_CLJ_CONTENTS
+
+
+i18n: msgfmt
+
+# Update locales/<project-name>.pot
+update-pot: locales/$(POT_NAME)
+
+locales/$(POT_NAME): $(shell $(FIND_SOURCES)) | locales
+	@tmp=$$(mktemp $@.tmp.XXXX);                                            \
+	$(FIND_SOURCES)                                                         \
+	    | xgettext --from-code=UTF-8 --language=lisp                        \
+	               --copyright-holder='Puppet <docs at puppet.com>'            \
+	               --package-name="$(BUNDLE)"                               \
+	               --package-version="$(BUNDLE_VERSION)"                    \
+	               --msgid-bugs-address="docs at puppet.com"                   \
+	               -k                                                       \
+	               -kmark:1 -ki18n/mark:1                                   \
+	               -ktrs:1 -ki18n/trs:1                                     \
+	               -ktru:1 -ki18n/tru:1                                     \
+	               -ktrun:1,2 -ki18n/trun:1,2                               \
+	               -ktrsn:1,2 -ki18n/trsn:1,2                               \
+	               $(LOC_OPT)                                               \
+	               --add-comments --sort-by-file                            \
+	               -o $$tmp -f -;                                           \
+	sed -i.bak -e 's/charset=CHARSET/charset=UTF-8/' $$tmp;                 \
+	sed -i.bak -e 's/POT-Creation-Date: [^\\]*/POT-Creation-Date: /' $$tmp; \
+	rm -f $$tmp.bak;                                                        \
+	if ! diff -q -I POT-Creation-Date $$tmp $@ >/dev/null 2>&1; then        \
+	    mv $$tmp $@;                                                        \
+	else                                                                    \
+	    rm $$tmp; touch $@;                                                 \
+	fi
+
+# Run msgfmt over all .po files to generate Java resource bundles
+# and create the locales.clj file
+msgfmt: $(BUNDLE_FILES) $(LOCALES_CLJ) clean-orphaned-bundles
+
+# Force rebuild of locales.clj if its contents is not the the desired one. The
+# shell echo is used to add a trailing newline to match the one from `cat`
+ifneq ($(shell cat $(LOCALES_CLJ) 2> /dev/null),$(shell echo '$(LOCALES_CLJ_CONTENTS)'))
+.PHONY: $(LOCALES_CLJ)
+endif
+$(LOCALES_CLJ): | resources
+	@echo "Writing $@"
+	@echo "$$LOCALES_CLJ_CONTENTS" > $@
+
+# Remove every resource bundle that wasn't generated from a PO file.
+# We do this because we used to generate the english bundle directly from the POT.
+.PHONY: clean-orphaned-bundles
+clean-orphaned-bundles:
+	@for bundle in resources/$(BUNDLE_DIR)/Messages_*.class; do                                  \
+	  locale=$$(basename "$$bundle" | sed -E -e 's/\$$?1?\.class$$/_class/' | cut -d '_' -f 2;); \
+	  if [ ! -f "locales/$$locale.po" ]; then                                                    \
+	    rm "$$bundle";                                                                           \
+	  fi                                                                                         \
+	done
+
+resources/$(BUNDLE_DIR)/Messages_%.class: locales/%.po | resources
+	msgfmt --java2 -d resources -r $(BUNDLE).Messages -l $(*F) $<
+
+# Use this to initialize translations. Updating the PO files is done
+# automatically through a CI job that utilizes the scripts in the project's
+# `bin` file, which themselves come from the `clj-i18n` project.
+locales/%.po: | locales
+	@if [ ! -f $@ ]; then                                         \
+	    touch $@ && msginit --no-translator -l $(*F) -o $@ -i locales/$(POT_NAME); \
+	fi
+
+resources locales:
+	@mkdir $@
+
+help:
+	$(info $(HELP))
+	@echo
+
+.PHONY: help
+
+define HELP
+This Makefile assists in handling i18n related tasks during development. Files
+that need to be checked into source control are put into the locales/ directory.
+They are
+
+  locales/$(POT_NAME)   - the POT file generated by 'make update-pot'
+  locales/$$LANG.po       - the translations for $$LANG
+
+Only the $$LANG.po files should be edited manually; this is usually done by
+translators.
+
+You can use the following targets:
+
+  i18n:             refresh all the files in locales/ and recompile resources
+  update-pot:       extract strings and update locales/$(POT_NAME)
+  locales/LANG.po:  create translations for LANG
+  msgfmt:           compile the translations into Java classes; this step is
+                    needed to make translations available to the Clojure code
+                    and produces Java class files in resources/
+endef
+# @todo lutter 2015-04-20: for projects that use libraries with their own
+# translation, we need to combine all their translations into one big po
+# file and then run msgfmt over that so that we only have to deal with one
+# resource bundle


=====================================
ext/travisci/test.sh
=====================================
@@ -1,3 +1,3 @@
 #!/bin/bash
 
-lein2 test
+lein test


=====================================
locales/eo.po
=====================================
@@ -0,0 +1,30 @@
+# Esperanto translations for puppetlabs.trapperkeeper_scheduler package.
+# Copyright (C) 2017 Puppet <docs at puppet.com>
+# This file is distributed under the same license as the puppetlabs.trapperkeeper_scheduler package.
+# Automatically generated, 2017.
+#
+msgid ""
+msgstr ""
+"Project-Id-Version: puppetlabs.trapperkeeper_scheduler \n"
+"Report-Msgid-Bugs-To: docs at puppet.com\n"
+"POT-Creation-Date: \n"
+"PO-Revision-Date: \n"
+"Last-Translator: Automatically generated\n"
+"Language-Team: none\n"
+"Language: eo\n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+"Plural-Forms: nplurals=2; plural=(n != 1);\n"
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "scheduled job threw error"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Initializing Scheduler Service"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Shutting down Scheduler Service"
+msgstr ""


=====================================
locales/trapperkeeper-scheduler.pot
=====================================
@@ -0,0 +1,68 @@
+# SOME DESCRIPTIVE TITLE.
+# Copyright (C) YEAR Puppet <docs at puppet.com>
+# This file is distributed under the same license as the puppetlabs.trapperkeeper_scheduler package.
+# FIRST AUTHOR <EMAIL at ADDRESS>, YEAR.
+#
+#, fuzzy
+msgid ""
+msgstr ""
+"Project-Id-Version: puppetlabs.trapperkeeper_scheduler \n"
+"X-Git-Ref: ccf78ef0d6b688ce8fc24988277ddf04de9f8945\n"
+"Report-Msgid-Bugs-To: docs at puppet.com\n"
+"POT-Creation-Date: \n"
+"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
+"Last-Translator: FULL NAME <EMAIL at ADDRESS>\n"
+"Language-Team: LANGUAGE <LL at li.org>\n"
+"Language: \n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/job.clj
+msgid "Skipping execution of job {0} because of missed interval."
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/job.clj
+msgid "scheduled job threw error"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Scheduled function must be non-nil"
+msgstr ""
+
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failed to schedule job"
+msgstr ""
+
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure stopping job"
+msgstr ""
+
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure getting all jobs"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failed to shutdown schedule service in {0} seconds"
+msgstr ""
+
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure stopping all jobs"
+msgstr ""
+
+#. this can occur if the function is called when the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure getting jobs in group"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Initializing Scheduler Service"
+msgstr ""
+
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Shutting down Scheduler Service"
+msgstr ""


=====================================
project.clj
=====================================
@@ -1,14 +1,16 @@
-(def tk-version "1.1.1")
-(def ks-version "1.0.0")
-
-(defproject puppetlabs/trapperkeeper-scheduler "0.1.0"
+(defproject puppetlabs/trapperkeeper-scheduler "1.1.3"
   :description "Trapperkeeper Scheduler Service"
 
-  :dependencies [[org.clojure/clojure "1.6.0"]
-                 [puppetlabs/trapperkeeper ~tk-version]
-                 [puppetlabs/kitchensink ~ks-version]
-                 [prismatic/schema "0.4.0"]
-                 [overtone/at-at "1.2.0"]]
+  :dependencies [[org.clojure/clojure]
+                 [puppetlabs/trapperkeeper]
+                 [puppetlabs/i18n]
+                 [puppetlabs/kitchensink]
+                 [org.quartz-scheduler/quartz "2.3.2" :exclusions [c3p0]]]
+
+  :min-lein-version "2.9.1"
+
+  :parent-project {:coords [puppetlabs/clj-parent "4.6.7"]
+                   :inherit [:managed-dependencies]}
 
   :pedantic? :abort
 
@@ -23,10 +25,10 @@
                                      :sign-releases false}]]
 
   :profiles {:dev {:source-paths ["dev"]
-                   :dependencies [[org.clojure/tools.namespace "0.2.4"]
-                                  [puppetlabs/trapperkeeper ~tk-version :classifier "test" :scope "test"]
-                                  [puppetlabs/kitchensink ~ks-version :classifier "test" :scope "test"]
-                                  [spyscope "0.1.4" :exclusions [clj-time]]]
-                   :injections [(require 'spyscope.core)]}}
+                   :dependencies [[puppetlabs/trapperkeeper :classifier "test" :scope "test"]
+                                  [puppetlabs/kitchensink :classifier "test" :scope "test"]]}}
 
+  :plugins  [[lein-parent "0.3.7"]
+             [puppetlabs/i18n "0.8.0"]]
+  :aot [puppetlabs.trapperkeeper.services.scheduler.job]
   :repl-options {:init-ns user})


=====================================
src/puppetlabs/trapperkeeper/services/protocols/scheduler.clj
=====================================
@@ -19,6 +19,26 @@
     group can be provided to associated jobs with each other to allow
     them to be stopped together.")
 
+  (interval
+    [this n f]
+    [this n f group-id]
+   "Calls 'f' repeatedly with a delay of 'n' milliseconds between the
+    beginning of a given invocation and the beginning of the following
+    invocation. If an invocation executon time is longer than the interval,
+    the subsquent invocation is skipped.
+    Returns an identifier for the scheduled job. An optional
+    group-id can be provided to collect a set of jobs into one group to allow
+    them to be stopped together.")
+
+  (interval-after
+    [this initial-delay repeat-delay f]
+    [this initial-delay repeat-delay f group-id]
+    "Calls 'f' repeatedly with a delay of 'repeat-delay' milliseconds after the `initial-delay` in millseconds.
+    Returns an identifier for the scheduled job. An optional
+    group-id can be provided to collect a set of jobs into one group to allow
+    them to be stopped together.")
+
+
   (stop-job [this job]
     "Given an identifier of a scheduled job, stop its execution.  If an
     invocation of the job is currently executing, it will be allowed to
@@ -38,4 +58,12 @@
     [this group-id]
     "Return the number of jobs known to the scheduler service, or the number
     of jobs known to the scheduler service by group id. A nil group-id
-    will return the count of all jobs."))
+    will return the count of all jobs.")
+
+  (get-jobs
+   [this]
+   [this group-id]
+   "Return all the known job identifiers, or the job identifiers associated
+   with the given group."))
+
+


=====================================
src/puppetlabs/trapperkeeper/services/scheduler/job.clj
=====================================
@@ -0,0 +1,74 @@
+(ns puppetlabs.trapperkeeper.services.scheduler.job
+  (:gen-class
+   :name puppetlabs.trapperkeeper.services.scheduler.job
+   :state state
+   :init init
+   :constructors {[] []}
+   :implements [org.quartz.StatefulJob org.quartz.InterruptableJob]
+   :prefix "-")
+  (:require [clojure.tools.logging :as log]
+            [puppetlabs.i18n.core :as i18n])
+  (:import (org.quartz JobExecutionContext JobDataMap JobExecutionException DateBuilder DateBuilder$IntervalUnit)
+           (java.util Date)))
+
+(defn -init
+  []
+  [[] (atom {})])
+
+(defn- recurring?
+  [options]
+  (or (contains? options :interval) (contains? options :interspaced)))
+
+(defn- calculate-next-execution-time
+  [context options]
+  (if (contains? options :interspaced)
+    (Date. ^Long (+ (System/currentTimeMillis) (:interspaced options)))
+    (.getFireTimeAfter (.getTrigger context) (Date.))))
+
+(defn- should-skip?
+  [context options]
+  (when (contains? options :interval)
+    (let [interval-ms (:interval options)
+          now-ms (System/currentTimeMillis)
+          scheduled-fire-time-ms (.getTime (.getScheduledFireTime context))]
+      ; if the scheduled execution time is an interval or more away, skip it.
+      (> now-ms (+ scheduled-fire-time-ms interval-ms)))))
+
+(defn -execute
+  [this ^JobExecutionContext context]
+  (try
+    (let [^JobDataMap merged (.getMergedJobDataMap context)
+          options (.get merged "jobData")
+          f (:job options)]
+      (swap! (.state this) into {:current-thread (Thread/currentThread)})
+
+      (if-not (should-skip? context options)
+        (f)
+        (log/info (i18n/trs "Skipping execution of job {0} because of missed interval." (.toString (.getKey (.getJobDetail context))))))
+
+      ; using quartz interval execution does not take into account the
+      ; execution time of the actual job.  For interspaced jobs, this means
+      ; triggering the job after this one completes, for interval jobs,
+      ; this means fast-forwarding the execution time to the next logical
+      ; one
+      (when (recurring? options)
+        (let [scheduler (.getScheduler context)
+              oldTrigger (.getTrigger context)
+              future-date (calculate-next-execution-time context options)
+              trigger (-> (.getTriggerBuilder oldTrigger)
+                          (.startAt future-date)
+                          (.build))]
+          (.rescheduleJob scheduler (.getKey oldTrigger) trigger))))
+
+
+    (catch Throwable e
+      (log/error e (i18n/trs "scheduled job threw error"))
+      (let [new-exception (JobExecutionException. ^Throwable e)]
+        (.setUnscheduleFiringTrigger new-exception true)
+        (throw new-exception)))))
+
+(defn -interrupt
+  [this]
+  (when-let [thread (:current-thread @(.state this))]
+    (.interrupt thread)))
+


=====================================
src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
=====================================
@@ -1,63 +1,169 @@
 (ns puppetlabs.trapperkeeper.services.scheduler.scheduler-core
-  (:require [overtone.at-at :as at-at]
-            [clojure.tools.logging :as log]))
-
-(defn create-pool
-  "Creates and returns a thread pool which can be used for scheduling jobs."
-  []
-  (at-at/mk-pool))
-
-(defn wrap-with-error-logging
-  "Returns a function that will invoke 'f' inside a try/catch block.  If an
-  error occurs during execution of 'f', it will be logged and re-thrown."
-  [f]
-  (fn []
-    (try
-      (f)
-      (catch Throwable t
-        (log/error t "scheduled job threw error")
-        (throw t)))))
+  (:require [clojure.tools.logging :as log]
+            [puppetlabs.i18n.core :as i18n]
+            [puppetlabs.kitchensink.core :as ks])
+  (:import (org.quartz.impl.matchers GroupMatcher)
+           (org.quartz.impl StdSchedulerFactory SchedulerRepository)
+           (org.quartz JobBuilder SimpleScheduleBuilder TriggerBuilder
+                       Scheduler JobKey SchedulerException JobDataMap)
+           (org.quartz.utils Key)
+           (java.util Date UUID Properties)))
+
+(def shutdown-timeout-sec 30)
+
+(defn create-scheduler
+  "Creates and returns a scheduler with configured thread pool which
+  can be used for scheduling jobs."
+  [thread-count]
+  (let [config [["org.quartz.scheduler.skipUpdateCheck" "true"]
+                ["org.quartz.scheduler.instanceName" (.toString (UUID/randomUUID))]
+                ["org.quartz.threadPool.threadCount" (str thread-count)]]
+        props (.clone (System/getProperties))
+        _ (doseq [[k v] config]
+            (.setProperty props k v))
+        factory (StdSchedulerFactory. ^Properties props)
+        scheduler (.getScheduler factory)]
+    (.start scheduler)
+    scheduler))
+
+(defn build-executable-job
+  ([f job-name group-name] (build-executable-job f job-name group-name {}))
+  ([f job-name group-name options]
+   (when (nil? f)
+     (throw (IllegalArgumentException. ^String (i18n/trs "Scheduled function must be non-nil"))))
+   (let [jdm (JobDataMap.)
+         options (assoc options :job f)]
+     (.put jdm "jobData" options)
+     (-> (JobBuilder/newJob puppetlabs.trapperkeeper.services.scheduler.job)
+         (.withIdentity job-name group-name)
+         (.usingJobData jdm)
+         (.build)))))
 
 (defn interspaced
-  ; See docs on the service protocol,
-  ; puppetlabs.enterprise.services.protocols.scheduler
-  [n f pool]
-  (let [job (wrap-with-error-logging f)]
-    (-> (at-at/interspaced n job pool)
-        :id  ; return only the ID; do not leak the at-at RecurringJob instance
-        )))
+  [n f ^Scheduler scheduler group-name]
+  (try
+    (let [job-name (Key/createUniqueName group-name)
+           job (build-executable-job f job-name group-name {:interspaced n})
+           schedule (SimpleScheduleBuilder/simpleSchedule)
+           trigger (-> (TriggerBuilder/newTrigger)
+                       (.withSchedule schedule)
+                       (.startNow)
+                       (.build))]
+      (.scheduleJob scheduler job trigger)
+      (.getJobKey trigger))
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/error e (i18n/trs "Failed to schedule job")))))
 
 (defn after
-  ; See docs on the service protocol,
-  ; puppetlabs.enterprise.services.protocols.scheduler
-  [n f pool]
-  (let [job (wrap-with-error-logging f)]
-    (-> (at-at/after n job pool)
-        :id  ; return only the ID; do not leak the at-at RecurringJob instance
-        )))
+  [n f ^Scheduler scheduler group-name]
+  (try
+    (let [job-name (Key/createUniqueName group-name)
+          job (build-executable-job f job-name group-name)
+          future-date (Date. ^Long (+ (System/currentTimeMillis) n))
+          trigger (-> (TriggerBuilder/newTrigger)
+                      (.startAt future-date)
+                      (.build))]
+      (.scheduleJob scheduler job trigger)
+      (.getJobKey trigger))
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/error e (i18n/trs "Failed to schedule job")))))
+
+(defn interval
+  [^Scheduler scheduler repeat-delay f group-name]
+  (try
+    (let [job-name (Key/createUniqueName group-name)
+          job (build-executable-job f job-name group-name {:interval repeat-delay})
+          schedule (-> (SimpleScheduleBuilder/simpleSchedule)
+                       (.withIntervalInMilliseconds repeat-delay)
+                       ; allow quartz to reschedule things outside "org.quartz.jobStore.misfireThreshold" using internal logic
+                       ; this isn't sufficient for short interval jobs, so additional scheduling logic is included in the job itself
+                       (.withMisfireHandlingInstructionNextWithRemainingCount)
+                       (.repeatForever))
+
+          trigger (-> (TriggerBuilder/newTrigger)
+                      (.withSchedule schedule)
+                      (.startNow)
+                      (.build))]
+      (.scheduleJob scheduler job trigger)
+      (.getJobKey trigger))
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/error e (i18n/trs "Failed to schedule job")))))
+
+(defn interval-after
+  [^Scheduler scheduler initial-delay repeat-delay f group-name]
+  (try
+    (let [job-name (Key/createUniqueName group-name)
+          job (build-executable-job f job-name group-name {:interval repeat-delay})
+          schedule (-> (SimpleScheduleBuilder/simpleSchedule)
+                       (.withIntervalInMilliseconds repeat-delay)
+                       ; allow quartz to reschedule things outside "org.quartz.jobStore.misfireThreshold" using internal logic
+                       ; this isn't sufficient for short interval jobs, so additional scheduling logic is included in the job itself
+                       (.withMisfireHandlingInstructionNextWithRemainingCount)
+                       (.repeatForever))
+          future-date (Date. ^Long (+ (System/currentTimeMillis) initial-delay))
+          trigger (-> (TriggerBuilder/newTrigger)
+                      (.withSchedule schedule)
+                      (.startAt future-date)
+                      (.build))]
+      (.scheduleJob scheduler job trigger)
+      (.getJobKey trigger))
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/error e (i18n/trs "Failed to schedule job")))))
 
 (defn stop-job
-  "Gracefully stops the job specified by 'id'."
-  [id pool]
-  (at-at/stop id pool))
+  "Returns true, if the job was deleted, and false if the job wasn't found."
+  [^JobKey id ^Scheduler scheduler]
+  (try
+    (.deleteJob scheduler id)
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/debug e (i18n/trs "Failure stopping job"))
+      false)))
+
+(defn get-all-jobs
+  [^Scheduler scheduler]
+  (try
+    (let [groups (seq (.getJobGroupNames scheduler))
+          extract-keys (fn [group-name] (seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-name))))]
+      (mapcat extract-keys groups))
+    (catch SchedulerException e
+      ; this can occur if the interface is being used while the scheduler is shutdown
+      (log/debug e (i18n/trs "Failure getting all jobs"))
+      [])))
 
 (defn stop-all-jobs!
-  "Stops all of the specified jobs."
-  [jobs pool]
-  (doseq [job jobs]
-    (stop-job job pool))
-
-  ; Shutdown at-at's thread pool.  This is the only way to do it, which is
-  ; unfortunate because it also resets the thread pool.  It's possible to
-  ; hack around this via ...
-  ;
-  ;   (-> pool
-  ;       :pool-atom
-  ;       (deref)
-  ;       :thread-pool
-  ;       (.shutdown))
-  ;
-  ; ... but that is a horrible hack.  I've opened an issue with at-at to add a
-  ; function that can be called to just stop the pool and not also reset it -
-  ; https://github.com/overtone/at-at/issues/13
-  (at-at/stop-and-reset-pool! pool))
+  [^Scheduler scheduler]
+  (when-not (.isShutdown scheduler)
+    (try
+      (let [sr (SchedulerRepository/getInstance)
+            scheduler-name (.getSchedulerName scheduler)]
+        (doseq [job (get-all-jobs scheduler)]
+          (try
+            (.interrupt scheduler job)
+            (.deleteJob scheduler job)
+            (catch SchedulerException e
+              ; this can occur if the interface is being used while the scheduler is shutdown
+              (log/debug e (i18n/trs "Failure stopping job")))))
+
+        (when (= :timeout (ks/with-timeout shutdown-timeout-sec :timeout (.shutdown scheduler true)))
+          (log/info (i18n/trs "Failed to shutdown schedule service in {0} seconds" shutdown-timeout-sec))
+          (.shutdown scheduler))
+        ; explicitly remove the scheduler from the registry to prevent leaks.  This can happen if the
+        ; jobs don't terminate immediately
+        (.remove sr scheduler-name))
+      (catch SchedulerException e
+        ; this can occur if the interface is being used while the scheduler is shutdown
+        (log/debug e (i18n/trs "Failure stopping all jobs"))))))
+
+(defn get-jobs-in-group
+  [^Scheduler scheduler group-id]
+  (try
+    (seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-id)))
+    (catch SchedulerException e
+      ; this can occur if the function is called when the scheduler is shutdown
+      (log/debug e (i18n/trs "Failure getting jobs in group"))
+      [])))


=====================================
src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
=====================================
@@ -2,54 +2,18 @@
   (:require [puppetlabs.trapperkeeper.services :as tk]
             [puppetlabs.trapperkeeper.services.protocols.scheduler :refer :all]
             [puppetlabs.trapperkeeper.services.scheduler.scheduler-core :as core]
-            [clojure.tools.logging :as log]))
+            [clojure.tools.logging :as log]
+            [puppetlabs.i18n.core :as i18n])
+  (:import (org.quartz SchedulerException)))
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Internal "helper" functions
 
-(defn get-pool
+(defn get-scheduler
   [this]
   (-> this
       (tk/service-context)
-      :pool))
-
-(defn get-jobs
-  [this]
-  (-> this
-      (tk/service-context)
-      :jobs))
-
-(defn- enqueue-job!
-  ([this id]
-   (enqueue-job! this id {}))
-  ([this id opts]
-   (let [result (assoc opts :job id)]
-    (swap! (get-jobs this) conj result)
-    result)))
-
-(defn- dequeue-job!
-  ([this job]
-    (swap! (get-jobs this) disj job))
-  ([this id keyword]
-    (when-let [item (first (filter #(= id (get % keyword)) @(get-jobs this)))]
-      (swap! (get-jobs this) disj item))))
-
-(defn- after-job
-  "Jobs run with `after` only execute once, and when done need to be reomved from
-  the scheduled jobs set.  This wraps the job's function so that the job is removed
-  correctly from the set when it completes (or fails)."
-  [this after-id f]
-  (fn []
-    (try
-      (f)
-      (finally
-        (dequeue-job! this after-id :after-id)))))
-
-(defn- jobs-by-group-id
-  [this group-id]
-  (if group-id
-    (filter #(= group-id (:group-id %)) @(get-jobs this))
-    @(get-jobs this)))
+      :scheduler))
 
 (defn- create-maybe-stop-job-fn
   "given a stop-job function, return function that when given a job returns
@@ -57,65 +21,84 @@
   [stop-fn]
   (fn [job]
     {:job job
-    :stopped? (stop-fn job)}))
+     :stopped? (stop-fn job)}))
+
+(def default-group-name "SCHEDULER_DEFAULT")
+
+(defn safe-group-id
+  [group-id]
+  (if (and (not (keyword? group-id)) (empty? group-id))
+    default-group-name
+    (str group-id)))
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Trapperkeeper service definition
 (tk/defservice scheduler-service
   SchedulerService
-  []
+  [[:ConfigService get-in-config]]
 
   (init [this context]
-    (log/debug "Initializing Scheduler Service")
-    (let [pool (core/create-pool)]
-      (assoc context :pool pool
-                     :jobs (atom #{})
-                     :after-id (atom 0))))
+    (log/info (i18n/trs "Initializing Scheduler Service"))
+     ;; the default in Quartz is 10 threads, so make that the default if it isn't specified
+    (let [scheduler (core/create-scheduler (get-in-config [:scheduler :thread-count] 10))]
+      (assoc context :scheduler scheduler)))
 
   (stop [this context]
-    (log/debug "Shutting down Scheduler Service")
-    ; Stop any jobs that are still running
-    (core/stop-all-jobs! @(:jobs context) (get-pool this))
-    (log/debug "Scheduler Service shutdown complete.")
+    (log/info (i18n/trs "Shutting down Scheduler Service"))
+    (core/stop-all-jobs! (get-scheduler this))
+    (log/info "Scheduler Service shutdown complete.")
     context)
 
   (interspaced [this n f]
-    (interspaced this n f nil))
+    (interspaced this n f default-group-name))
 
   (interspaced [this n f group-id]
-               (let [id (core/interspaced n f (get-pool this))]
-                 (enqueue-job! this id {:group-id group-id})))
+    (core/interspaced n f (get-scheduler this) (safe-group-id group-id)))
 
   (after [this n f]
-   (after this n f nil))
+   (after this n f default-group-name))
 
   (after [this n f group-id]
-     ; use after-id to identify the job for the cleanup "after-job" wrapper
-    (let [after-id (swap! (:after-id (tk/service-context this)) inc)
-          ; wrap the job function in a function that will remove the job from the job set when it is done
-          wrapped-fn (after-job this after-id f)
-          id (core/after n wrapped-fn (get-pool this))]
-      (enqueue-job! this id {:after-id after-id
-                           :group-id   group-id})))
+     (core/after n f (get-scheduler this) (safe-group-id group-id)))
+
+  (interval [this n f]
+    (interval this n f default-group-name))
+
+  (interval [this n f group-id]
+    (core/interval (get-scheduler this) n f (safe-group-id group-id)))
+
+
+  (interval-after [this initial-delay repeat-delay f]
+    (interval-after this initial-delay repeat-delay f default-group-name))
+
+  (interval-after [this initial-delay repeat-delay f group-id]
+    (core/interval-after (get-scheduler this) initial-delay repeat-delay f (safe-group-id group-id)))
 
   (stop-job [this job]
-    (let [result (core/stop-job (:job job) (get-pool this))]
-       (dequeue-job! this job)
-       result))
+    (core/stop-job job (get-scheduler this)))
 
   (stop-jobs
     [this]
-    (stop-jobs this nil))
+    (stop-jobs this default-group-name))
 
   (stop-jobs [this group-id]
-    (let [jobs-by-group (jobs-by-group-id this group-id)]
+    (let [jobs-by-group (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id))]
        (reduce conj []
          (map
            (create-maybe-stop-job-fn (partial stop-job this))
-            jobs-by-group))))
+           jobs-by-group))))
+
+  (get-jobs
+   [this]
+   (core/get-all-jobs (get-scheduler this)))
+
+  (get-jobs
+    [this group-id]
+    (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id)))
 
   (count-jobs [this]
-    (count-jobs this nil))
+    (let [jobs (core/get-all-jobs (get-scheduler this))]
+      (count jobs)))
 
   (count-jobs [this group-id]
-    (count (jobs-by-group-id this group-id))))
+    (count (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id)))))


=====================================
test/integration/puppetlabs/trapperkeeper/services/scheduler/scheduler_service_test.clj
=====================================
@@ -1,17 +1,19 @@
 (ns puppetlabs.trapperkeeper.services.scheduler.scheduler-service-test
   (:require [clojure.test :refer :all]
+            [puppetlabs.kitchensink.core :as ks]
             [puppetlabs.trapperkeeper.testutils.bootstrap :refer :all]
             [puppetlabs.trapperkeeper.services.scheduler.scheduler-service :refer :all]
             [puppetlabs.trapperkeeper.services.protocols.scheduler :refer :all]
-            [puppetlabs.trapperkeeper.app :as tk]
-            [overtone.at-at :as at-at]))
+            [puppetlabs.trapperkeeper.services.scheduler.scheduler-core :as sc]
+            [puppetlabs.trapperkeeper.app :as tk])
+  (:import [java.util.concurrent TimeUnit CountDownLatch]))
 
 (deftest ^:integration test-interspaced
   (testing "without group-id"
     (with-app-with-empty-config app [scheduler-service]
       (testing "interspaced"
         (let [service (tk/get-service app :SchedulerService)
-              num-runs 3 ; let it run a few times, but not too many
+              num-runs 10 ; let it run a few times, but not too many
               interval 300
               p (promise)
               counter (atom 0)
@@ -40,7 +42,10 @@
             (stop-job service job-id))
 
           (testing (str "Each delay should be at least " interval "ms")
-            (is (every? (fn [delay] (>= delay interval)) @delays)))))))
+            (is (every? (fn [delay] (>= delay interval)) @delays)))
+
+          (testing "can schedule far in the future"
+            (interspaced service 21026149688 (constantly nil)))))))
 
   (testing "with group-id"
     (with-app-with-empty-config app [scheduler-service]
@@ -82,16 +87,18 @@
   (testing "without group-id"
     (with-app-with-empty-config app [scheduler-service]
       (testing "after"
-        (let [delay 100]
+        (let [delay 100
+              service (tk/get-service app :SchedulerService)]
           (testing "should execute at least " delay " milliseconds in the future"
             (let [completed (promise)
-                  job #(deliver completed (System/currentTimeMillis))
-                  service (tk/get-service app :SchedulerService)]
+                  job #(deliver completed (System/currentTimeMillis))]
               (let [schedule-time (System/currentTimeMillis)]
                 (after service delay job)
                 (let [execution-time (deref completed)
                       actual-delay (- execution-time schedule-time)]
-                  (is (>= actual-delay delay))))))))))
+                  (is (>= actual-delay delay))))))
+          (testing "can schedule far in the future"
+            (after service 21026149688 (constantly nil)))))))
 
   (testing "with group-id"
     (with-app-with-empty-config app [scheduler-service]
@@ -107,12 +114,6 @@
                       actual-delay (- execution-time schedule-time)]
                   (is (>= actual-delay delay)))))))))))
 
-; This test has a race condition, but it is very unlikley to occur in reality,
-; and so far it's actually been impossible to get this test to fail due to a
-; lost race.  'stop-job' is probably impossible to test deterministically.
-; It was decided that having a test with a race condition was better than no test
-; at all in this case, primarily due to the fact that the underlying scheduler
-; library (at-at) has no tests of its own.  :(
 (deftest ^:integration test-stop-job
   (testing "without group-id"
     (testing "stop-job lets a job complete but does not run it again"
@@ -141,7 +142,8 @@
               (Thread/sleep 100)
               (is (= original-start-time @start-time)))
             (testing "there should be no other jobs running"
-              (is (= 0 (count @(get-jobs service))))))))))
+              (is (= 0 (count-jobs service)))))))))
+
   (testing "with group-id"
     (testing "stop-job lets a job complete but does not run it again"
       (with-app-with-empty-config app [scheduler-service]
@@ -169,11 +171,16 @@
               (Thread/sleep 100)
               (is (= original-start-time @start-time)))
             (testing "there should be no other jobs running"
-              (is (= 0 (count @(get-jobs service)))))))))))
+              (is (= 0 (count-jobs service))))))))))
 
 (defn guaranteed-start-interval-job
   ([service interval]
-    (guaranteed-start-interval-job service interval nil))
+   (let [started (promise)
+         job (fn []
+               (deliver started nil))
+         result (interspaced service interval job)]
+     (deref started)
+     result))
 
   ([service interval group-id]
    (let [started (promise)
@@ -192,15 +199,15 @@
             job-0 (guaranteed-start-interval-job service interval)]
         (is (= 1 (count-jobs service)))
         (let [job-1 (guaranteed-start-interval-job service interval)]
-          (is (= 2 (count-jobs service))
+          (is (= 2 (count-jobs service)))
           (let [job-2 (guaranteed-start-interval-job service interval)]
-            (is (= 3 (count-jobs service)))
-            (stop-job service job-0)
-            (is (= 2 (count-jobs service)))
-            (stop-job service job-1)
-            (is (= 1 (count-jobs service)))
-            (stop-job service job-2)
-            (is (= 0 (count-jobs service)))))))))
+              (is (= 3 (count-jobs service)))
+              (stop-job service job-0)
+              (is (= 2 (count-jobs service)))
+              (stop-job service job-1)
+              (is (= 1 (count-jobs service)))
+              (stop-job service job-2)
+              (is (= 0 (count-jobs service))))))))
 
   (testing "count-jobs shows correct number of group-id and non-group-id jobs"
     (with-app-with-empty-config app [scheduler-service]
@@ -215,27 +222,27 @@
           (let [job-1 (guaranteed-start-interval-job service interval)]
             (is (= 3 (count-jobs service)))
             (is (= 1 (count-jobs service group-id)))
-              (let [group-id-job-1 (guaranteed-start-interval-job service interval group-id)]
+            (let [group-id-job-1 (guaranteed-start-interval-job service interval group-id)]
+              (is (= 4 (count-jobs service)))
+              (is (= 2 (count-jobs service group-id)))
+              (let [job-2 (guaranteed-start-interval-job service interval)]
+                (is (= 5 (count-jobs service)))
+                (is (= 2 (count-jobs service group-id)))
+                (stop-job service job-0)
                 (is (= 4 (count-jobs service)))
                 (is (= 2 (count-jobs service group-id)))
-                (let [job-2 (guaranteed-start-interval-job service interval)]
-                  (is (= 5 (count-jobs service)))
-                  (is (= 2 (count-jobs service group-id)))
-                  (stop-job service job-0)
-                  (is (= 4 (count-jobs service)))
-                  (is (= 2 (count-jobs service group-id)))
-                  (stop-job service group-id-job-0)
-                  (is (= 3 (count-jobs service)))
-                  (is (= 1 (count-jobs service group-id)))
-                  (stop-job service job-1)
-                  (is (= 2 (count-jobs service)))
-                  (is (= 1 (count-jobs service group-id)))
-                  (stop-job service group-id-job-1)
-                  (is (= 1 (count-jobs service)))
-                  (is (= 0 (count-jobs service group-id)))
-                  (stop-job service job-2)
-                  (is (= 0 (count-jobs service)))
-                  (is (= 0 (count-jobs service group-id))))))))))
+                (stop-job service group-id-job-0)
+                (is (= 3 (count-jobs service)))
+                (is (= 1 (count-jobs service group-id)))
+                (stop-job service job-1)
+                (is (= 2 (count-jobs service)))
+                (is (= 1 (count-jobs service group-id)))
+                (stop-job service group-id-job-1)
+                (is (= 1 (count-jobs service)))
+                (is (= 0 (count-jobs service group-id)))
+                (stop-job service job-2)
+                (is (= 0 (count-jobs service)))
+                (is (= 0 (count-jobs service group-id))))))))))
 
   (testing "after reduces count when complete"
     (with-app-with-empty-config app [scheduler-service]
@@ -292,34 +299,356 @@
           (is (= 1 (count-jobs service group-id-1))))
 
         (testing "stopping by group id stops the job"
-          (stop-jobs service group-id-1))
+          (stop-jobs service group-id-1)
           (is (= 0 (count-jobs service)))
           (is (= 0 (count-jobs service group-id-0)))
-          (is (= 0 (count-jobs service group-id-1)))))))
+          (is (= 0 (count-jobs service group-id-1))))))))
 
-(defn schedule-random-jobs
-  "Schedules several random jobs and returns their at/at IDs."
+(defn schedule-random-interspaced-jobs
+  "Schedules several random jobs and returns their JobKeys."
   [service]
   (set
-    (for [x [1 2 3]]
-      (:job (interspaced service 1000 (constantly x))))))
-
-; In the future, it might be reasonable to add a function like this into the
-; scheduler service protocol.  If so, this function can be deleted.
-(defn at-at-scheduled-jobs
-  "Returns all of at-at's scheduled jobs."
-  [service]
-  (set (map :id (at-at/scheduled-jobs (get-pool service)))))
+   (for [x [1 2 3]]
+     (interspaced service 1000 (constantly x)))))
 
 (deftest ^:integration test-shutdown
   (testing "Any remaining jobs will be stopped when the service is stopped."
     (let [app (bootstrap-services-with-empty-config [scheduler-service])
           service (tk/get-service app :SchedulerService)
-          job-ids (schedule-random-jobs service)]
+          job-ids (schedule-random-interspaced-jobs service)]
 
-      (testing "at-at reports all of the jobs we just scheduled"
-        (is (= job-ids (at-at-scheduled-jobs service))))
+      (testing "reports all of the jobs we just scheduled"
+        (is (= (set job-ids) (set (get-jobs service)))))
 
       (testing "Stopping the service stops all of the scheduled jobs"
         (tk/stop app)
-        (is (= #{} (at-at-scheduled-jobs service)))))))
+        (is (empty? (get-jobs service))))))
+
+  (testing "Shutdown honors timeout and interrupts existing jobs"
+    ; redefine the default timeout so we don't have to wait forever
+    (with-redefs [sc/shutdown-timeout-sec 5]
+      (let [app (bootstrap-services-with-empty-config [scheduler-service])
+            service (tk/get-service app :SchedulerService)
+            is-test-done (promise)
+            job-done (promise)
+            ; run a job that waits on a promise that isn't ever delivered, but is interrupted
+            job (interspaced service 1000 (fn []
+                                            (try
+                                              (deref is-test-done)
+                                              (catch InterruptedException _
+                                                (deliver job-done true)))))]
+
+        (testing "job was correctly scheduled"
+          (is (= (set [job]) (set (get-jobs service)))))
+
+        (testing "Stopping the service does not block forever"
+          (is (not= :timeout (ks/with-timeout 10 :timeout (tk/stop app))))
+          (is (empty? (get-jobs service))))
+        (deref job-done)))))
+
+(defn distances
+  "Calculate the distances between each item in a sequence."
+  [v]
+  (map #(- %2 %1) v (rest v)))
+
+;; define some acceptable bounded accuracy ratings.
+;; as the jvm warms up, the accuracy increases
+(def accuracy-high 150)
+(def accuracy-low -50)
+
+(deftest ^:integration test-interval
+  (let [num-runs 10] ; let it run a few times, but not too many
+    (testing "when recurring is > wait-time"
+      (doseq [[recurring-delay wait-time] [[500 100] [250 100] [233 200]]]
+        (let [expected-delta (- recurring-delay wait-time)]
+          (testing (str "testing recurring-delay " recurring-delay " wait time " wait-time)
+            (testing "subsequent delays are observed"
+              (with-app-with-empty-config app [scheduler-service]
+                (let [service (tk/get-service app :SchedulerService)
+                      counter (atom 0)
+                      stop-job-promise (promise)
+                      delays (atom [])
+                      start-times (atom [])
+                      last-completion-time (atom nil)
+                      job (fn []
+                            (let [local-counter (swap! counter inc)
+                                  start-time (System/currentTimeMillis)]
+                              (when @last-completion-time
+                                (let [delay (- start-time @last-completion-time)]
+                                  (swap! delays conj delay)))
+
+                              (swap! start-times conj start-time)
+
+                              (Thread/sleep wait-time)
+                              ; The test is over!
+                              (when (= local-counter num-runs)
+                                (deliver stop-job-promise nil))
+
+                              (reset! last-completion-time (System/currentTimeMillis))))
+                      job-start-time (System/currentTimeMillis)
+                      job-id (interval service recurring-delay job)]
+                    (deref stop-job-promise)
+                    (stop-job service job-id)
+                    ; all the jobs should be stopped
+                    (is (= 0 (count-jobs service)))
+                    (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+                      ; time between executions - expected time between executions should be in accuracy range
+                      (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+                      ; time between starting points of recurring task
+                      (is (every? (fn [difference]  (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times)))))))))))
+
+    (testing "when recurring < wait-time"
+      (doseq [[recurring-delay wait-time expected-delta] [[100 333 67] [100 250 50] [100 2330 70]]]
+        (testing (str "testing recurring-delay " recurring-delay " wait time " wait-time)
+          (testing "subsequent delays are observed"
+            (with-app-with-empty-config app [scheduler-service]
+              (let [service (tk/get-service app :SchedulerService)
+                    counter (atom 0)
+                    stop-job-promise (promise)
+                    delays (atom [])
+                    start-times (atom [])
+                    last-completion-time (atom nil)
+                    job (fn []
+                          (let [local-counter (swap! counter inc)
+                                start-time (System/currentTimeMillis)]
+                            (when @last-completion-time
+                              (let [delay (- start-time @last-completion-time)]
+                                (swap! delays conj delay)))
+                            (swap! start-times conj start-time)
+
+                            (Thread/sleep wait-time)
+
+                            ; The test is over!
+                            (when (= local-counter num-runs)
+                              (deliver stop-job-promise nil))
+
+                            (reset! last-completion-time (System/currentTimeMillis))))
+                    job-id (interval service recurring-delay job)]
+                (deref stop-job-promise)
+                (stop-job service job-id)
+                ; all the jobs should be stopped
+                (is (= 0 (count-jobs service)))
+
+                (testing (str "Each delay should be at less than " expected-delta "ms (within accuracy bounds)")
+                ; time between executions - expected time between executions should be in accuracy range
+                  (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+                  ; time between starting points of recurring task
+                  (is (every? (fn [difference]  (< accuracy-low (- (+ wait-time expected-delta) difference) accuracy-high)) (distances @start-times))))))))))))
+
+
+(deftest ^:integration test-interval-after
+  (let [num-runs 10]
+    (doseq [[initial-delay recurring-delay wait-time] [[333 500 100] [1000 250 100] [20 233 200]]]
+      (let [expected-delta (- recurring-delay wait-time)]
+        (testing (str "testing initial-delay " initial-delay "recurring-delay " recurring-delay " wait time " wait-time)
+          (testing "initial delay is correctly observed and subsequent delays are observed"
+            (with-app-with-empty-config app [scheduler-service]
+              (let [service (tk/get-service app :SchedulerService)
+                    counter (atom 0)
+                    stop-job-promise (promise)
+                    first-completed-timestamp (promise)
+                    delays (atom [])
+                    start-times (atom [])
+                    last-completion-time (atom nil)
+                    job (fn []
+                          (let [local-counter (swap! counter inc)
+                                start-time (System/currentTimeMillis)]
+                            (when (= 1 local-counter)
+                              (deliver first-completed-timestamp start-time))
+                            (when @last-completion-time
+                              (let [delay (- start-time @last-completion-time)]
+                                (swap! delays conj delay)))
+
+                            (swap! start-times conj start-time)
+
+                            (Thread/sleep wait-time)
+
+                            ; The test is over!
+                            (when (= local-counter num-runs)
+                              (deliver stop-job-promise nil))
+
+                            (reset! last-completion-time (System/currentTimeMillis))))
+                    job-start-time (System/currentTimeMillis)
+                    job-id (interval-after service initial-delay recurring-delay job)
+                    result (deref first-completed-timestamp)]
+                  (is (<= initial-delay (- result job-start-time)))
+                  (deref stop-job-promise)
+                  (stop-job service job-id)
+                  ; all the jobs should be stopped
+                  (is (= 0 (count-jobs service)))
+                  (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+                    ; time between executions - expected time between executions should be in accuracy range
+                    (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+                    ; time between starting points of recurring task
+                    (is (every? (fn [difference]  (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times)))))))
+
+          (testing "initial delay is correctly  observed and subsequent delays are observed with group"
+            (with-app-with-empty-config app [scheduler-service]
+              (let [service (tk/get-service app :SchedulerService)
+                    counter (atom 0)
+                    stop-job-promise (promise)
+                    first-completed-timestamp (promise)
+                    delays (atom [])
+                    start-times (atom [])
+                    last-completion-time (atom nil)
+                    group-id :unique-group-id
+                    job (fn []
+                          (let [local-counter (swap! counter inc)
+                                start-time (System/currentTimeMillis)]
+                            (when (= 1 local-counter)
+                              (deliver first-completed-timestamp start-time))
+                            (when @last-completion-time
+                              (let [delay (- start-time @last-completion-time)]
+                                (swap! delays conj delay)))
+
+                            (swap! start-times conj start-time)
+
+                            (Thread/sleep wait-time)
+
+                            ; The test is over!
+                            (when (= local-counter num-runs)
+                              (deliver stop-job-promise nil))
+
+                            (reset! last-completion-time (System/currentTimeMillis))))
+                    job-start-time (System/currentTimeMillis)
+                    job-id (interval-after service initial-delay recurring-delay job group-id)
+                    result (deref first-completed-timestamp)]
+                  (is (<= initial-delay (- result job-start-time)))
+                  (deref stop-job-promise)
+                  (stop-jobs service group-id)
+                  ;; all the jobs should be stopped
+                  (is (= 0 (count-jobs service)))
+                  (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+                    ; time to recur - time between executions - expected time between executions
+                    (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+                    ; time between starting points of recurring task
+                    (is (every? (fn [difference] (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times))))))))))))
+
+(deftest ^:integration test-thread-starvation
+  ; override the default to set the number of thread-pool threads
+  (let [initial-thread-count 1]
+    (with-app-with-config app [scheduler-service] {:scheduler {:thread-count initial-thread-count}}
+      (let [service (tk/get-service app :SchedulerService)]
+        (testing "after"
+          (testing "all jobs execute, even if delayed"
+            (let [num-jobs 20
+                  delay 100
+                  execution-latch (CountDownLatch. num-jobs)
+                  wait-time 500
+                  release-the-hounds (promise)
+                  count (atom 0)
+                  start-times (atom [])
+                  job (fn []
+                        (deref release-the-hounds)
+                        (swap! start-times conj (System/currentTimeMillis))
+                        (Thread/sleep wait-time)
+                        (swap! count inc)
+                        (.countDown execution-latch))]
+              (doseq [job-index (range 0 num-jobs)]
+                (after service delay job))
+              ;; allow the jobs to run once they are all set up
+              (deliver release-the-hounds true)
+              (is (.await execution-latch 20 TimeUnit/SECONDS))
+              (is (= num-jobs @count))
+              (is (every? (fn [difference] (<= wait-time difference (+ wait-time accuracy-high))) (distances @start-times))))))
+        (testing "interspaced"
+          (testing "all jobs execute at least once before they run again"
+            ;; this demonstrates that previously ready jobs are favored over rescheduled jobs
+            (let [num-jobs 20
+                  frequency 100
+                  execution-latch (CountDownLatch. (* 3 num-jobs))
+                  wait-time 250
+                  release-the-hounds (promise)
+                  run-count (atom 0)
+                  executed-jobs (atom [])
+                  create-identified-job (fn [id]
+                                          (fn []
+                                            (deref release-the-hounds)
+                                            (swap! executed-jobs conj id)
+                                            (Thread/sleep wait-time)
+                                            (swap! run-count inc)
+                                            (.countDown execution-latch)))]
+              (doseq [job-index (range 0 num-jobs)]
+                (interspaced service frequency (create-identified-job job-index)))
+              ;; allow the jobs to run once they are all set up
+              (deliver release-the-hounds true)
+              (is (.await execution-latch 20 TimeUnit/SECONDS))
+              ;; first 20 should be unique
+              (is (= 20 (count (distinct (take 20 @executed-jobs)))))
+              ;; second 20 should be unique
+              (is (= 20 (count (distinct (take 20 (drop 20 @executed-jobs))))))
+              ;; third 20 should be unique
+              (is (= 20 (count (distinct (take 20 (drop 40 @executed-jobs))))))
+              (stop-jobs service))))
+        (testing "interval"
+          (testing "interval jobs can starve other jobs"
+            ;; run 20 interval jobs starting about the same time (incremental times), with a wait time longer than the execution period
+            ;; with only one thread only one job should run successfully every time, the others should be starved out
+            (let [num-jobs 20
+                  frequency 100
+                  execution-latch (CountDownLatch. (* 3 num-jobs))
+                  wait-time 250
+                  release-the-hounds (promise)
+                  run-count (atom 0)
+                  executed-jobs (atom [])
+                  create-identified-job (fn [id]
+                                          (fn []
+                                            (deref release-the-hounds)
+                                            (swap! executed-jobs conj id)
+                                            (Thread/sleep wait-time)
+                                            (swap! run-count inc)
+                                            (.countDown execution-latch)))]
+              (doseq [job-index (range 0 num-jobs)]
+                (interval service frequency (create-identified-job job-index)))
+              ;; allow the jobs to run once they are all set up
+              (deliver release-the-hounds true)
+              (is (.await execution-latch 20 TimeUnit/SECONDS))
+              (is (= 1 (count (distinct @executed-jobs))))
+              (stop-jobs service))))
+        (testing "Interval job correctly skips execution if thread is unavailable"
+          (let [num-jobs 3
+                execution-count (atom 0)
+                start-times (atom [])
+                stop-test (promise)
+                ; interval should run immediately, and then every 100 ms
+                interval-frequency 100
+                ; wait 50ms for the after
+                after-start-time 50
+                after-sleep-time 500
+                ; calculate the next logical interval time
+                effective-first-time (* (quot (+ after-start-time after-sleep-time interval-frequency) interval-frequency) interval-frequency)
+                interval-job (fn []
+                               (swap! start-times conj (System/currentTimeMillis))
+                               (if (= num-jobs (swap! execution-count inc))
+                                  (deliver stop-test true)))
+                after-job (fn []
+                            (Thread/sleep after-sleep-time))]
+            (interval service interval-frequency interval-job)
+            (after service after-start-time after-job)
+            ; wait for the right number of interval jobs to execute
+            (deref stop-test)
+            (stop-jobs service)
+            ; the distance between the first two should be next interval of after-start-time + after-sleep time
+            (is (<= (+ effective-first-time accuracy-low) (first (distances @start-times)) (+ effective-first-time accuracy-high)))
+            ; the next run should be about the interval frequency
+            (is (<= (+ interval-frequency accuracy-low) (nth (distances @start-times) 1) (+ interval-frequency accuracy-high)))))))))
+
+(deftest ^:integration null-handling
+  (with-app-with-empty-config app [scheduler-service]
+      (let [service (tk/get-service app :SchedulerService)]
+        (testing "interspaced throws exception on nil function"
+          (is (thrown? IllegalArgumentException
+                (interspaced service 300 nil))))
+        (testing "after throws exception on nil function"
+          (is (thrown? IllegalArgumentException
+                (after service 300 nil))))
+        (testing "interval throws exception on nil function"
+          (is (thrown? IllegalArgumentException
+                (interval service 300 nil))))
+        (testing "interval-after throws exception on nil function"
+          (is (thrown? IllegalArgumentException
+                (interval-after service 1 300 nil)))))))
+
+
+
+


=====================================
test/unit/puppetlabs/trapperkeeper/services/scheduler/scheduler_core_test.clj deleted
=====================================
@@ -1,16 +0,0 @@
-(ns puppetlabs.trapperkeeper.services.scheduler.scheduler-core-test
-  (:require [clojure.test :refer :all]
-            [puppetlabs.trapperkeeper.services.scheduler.scheduler-core :refer :all]
-            [puppetlabs.trapperkeeper.testutils.logging :refer :all]
-            [schema.test :as schema-test]))
-
-(use-fixtures :once schema-test/validate-schemas)
-
-(deftest wrap-with-error-logging-test
-  (testing "when a job throws an exception, it is logged and re-thrown"
-    (let [f #(throw (Exception. "bummer"))]
-      (with-test-logging
-        (is (thrown-with-msg? Exception
-                              #"bummer"
-                              ((wrap-with-error-logging f))))
-        (is (logged? #"scheduled job threw error" :error))))))



View it on GitLab: https://salsa.debian.org/java-team/trapperkeeper-scheduler-clojure/-/commit/3dacb894100003b87ef4de6eb8e0546f19832fcd

-- 
View it on GitLab: https://salsa.debian.org/java-team/trapperkeeper-scheduler-clojure/-/commit/3dacb894100003b87ef4de6eb8e0546f19832fcd
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20201230/042f91a9/attachment.html>


More information about the pkg-java-commits mailing list