[Git][java-team/trapperkeeper-scheduler-clojure][upstream] New upstream version 1.1.3
Louis-Philippe Véronneau
gitlab at salsa.debian.org
Wed Dec 30 19:00:24 GMT 2020
Louis-Philippe Véronneau pushed to branch upstream at Debian Java Maintainers / trapperkeeper-scheduler-clojure
3dacb894 by Louis-Philippe Véronneau at 2020-12-19T14:41:38-05:00
New upstream version 1.1.3
- - - - -
17 changed files:
- .gitignore
- .travis.yml
- + Makefile
- + dev-resources/Makefile.i18n
- ext/travisci/test.sh
- + locales/eo.po
- + locales/trapperkeeper-scheduler.pot
- project.clj
- src/puppetlabs/trapperkeeper/services/protocols/scheduler.clj
- + src/puppetlabs/trapperkeeper/services/scheduler/job.clj
- src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
- src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
- test/integration/puppetlabs/trapperkeeper/services/scheduler/scheduler_service_test.clj
- − test/unit/puppetlabs/trapperkeeper/services/scheduler/scheduler_core_test.clj
@@ -10,3 +10,6 @@ pom.xml.asc
@@ -1,8 +1,8 @@
language: clojure
-lein: lein2
+lein: 2.9.1
-- oraclejdk7
-- openjdk7
+- openjdk8
+- openjdk11
script: ./ext/travisci/test.sh
email: false
@@ -1,3 +1,26 @@
+## 1.1.3
+ * ensure a non-nil function is passed to scheduling routines
+## 1.1.2
+ * add testing for java 11, disambiguate StdScheduleFactory constructor
+## 1.1.1
+ * use a unique scheduler rather than the default scheduler
+## 1.1.0
+ * add interface for `interval` and `interval-after` to the
+ protocol and implementation to allow regular cadance jobs.
+ * add support for thread-count configuration value that defaults to 10
+## 1.0.1
+ * exclude c3p0 from dependencies, it isn't used.
+## 1.0.0
+ * switch from at/at to the Quartz schedule library
+ * update clj-parent and drop support for java 7
+ * reimplement the group-id mapping using quartz internals
+ * add interface for listing the exiting job identifiers
## 0.1.0
* Add the concept of group-id to the job creation endpoints to allow
jobs to be grouped together for listing and cancellation.
@@ -0,0 +1,4 @@
+# This will cause the puppetserver-maintainers group to be assigned
+# review of any opened PRs against the branches containing this file.
+* @puppetlabs/puppetserver-maintainers
@@ -0,0 +1 @@
+include dev-resources/Makefile.i18n
@@ -29,6 +29,22 @@ The functions that are currently available are as follows:
Returns an identifier that can be used to reference this scheduled job (e.g.,
for cancellation) later. A group identifier `group-id` can be provided that
allows jobs in the same group to be stopped at the same time.
+* `interval [interval-ms f]`: schedules a job that will call `f`, block until
+ that call completes, and then run again at the next logical interval based on
+ `interval-ms` and the original start time. In other words, `f` will get called every
+ `interval-ms` unless the execution time for `f` exceeds `interval-ms` in which case
+ that execution is skipped.
+ Returns an identifier that can be used to reference this scheduled job (e.g.,
+ for cancellation) later.
+* `interval [interval-ms f group-id]`: schedules a job that will call `f`, block until
+ that call completes, and then run again at the next logical interval based on
+ `interval-ms` and the original start time. In other words, `f` will get called every
+ `interval-ms` unless the execution time for `f` exceeds `interval-ms` in which case
+ that execution is skipped. If there are insufficient threads in the thread pool to
+ run the interval job at the time of execution, the job will be skipped. Returns an
+ identifier that can be used to reference this scheduled job (e.g.,
+ for cancellation) later. A group identifier `group-id` can be provided that
+ allows jobs in the same group to be stopped at the same time.
* `after [interval-ms f]`: schedules a job that will call `f` a single time, after
a delay of `interval-ms` milliseconds. Returns an identifier that can be used
to reference this scheduled job (e.g. for cancellation) later.
@@ -37,6 +53,11 @@ The functions that are currently available are as follows:
to reference this scheduled job (e.g. for cancellation) later. A group identifier
`group-id` can be provided that allows jobs in the same group to be stopped
at the same time.
+* `interval-after [initial-delay-ms interval-ms f]`: Similar to `interval` but delays
+ initial execution until `initial-delay-ms` has occurred.
+* `interval-after [initial-delay-ms interval-ms f group-id]`:Similar to `interval` but delays
+ initial execution until `initial-delay-ms` has occurred A group identifier `group-id` can be provided that
+ allows jobs in the same group to be stopped at the same time.
* `stop-job [job-id]`: Given a `job-id` returned by one of the previous functions,
cancels the job. If the job is currently executing it will be allowed to complete,
but will not be invoked again afterward. Returns `true` if the job was successfully
@@ -53,32 +74,23 @@ The functions that are currently available are as follows:
* `count-jobs [group-id]`: Return a count of the total number of scheduled jobs
with the associated `group-id` known to to the scheduling service.
`after` jobs that have completed won't be included in the total.
+* `get-jobs []`: return a list of the current job identifiers
+* `get-jobs [group-id]`: return a list of the current job identifiers associated
+ with the specified group identifier
### Implementation Details
-The current implementation of the `SchedulerService` is a fairly thin wrapper around
-the [`overtone/at-at`](https://github.com/overtone/at-at) library. This approach
-was chosen for a couple of reasons:
-* A few existing PL projects already use this library, so we thought it'd be better
- not to introduce a different scheduling subsystem until all of the PL TK projects
- are aligned to use the SchedulerService.
-* The `at-at` API seems like a pretty reasonable Clojure API for scheduling tasks.
+A configuration value is available under scheduler->thread-count that controls
+the number of threads used internally by the quartz library for job scheduling.
+If not specified, it defaults to 10, which is the quartz internal default.
-It would probably be a good idea to switch the implementation out to use a different
-backend in the future; without having done too much investigation yet, I'd be
-interested in looking into Quartz/Quartzite, simply because Quartz is very widely-used
-and battle-tested in the Java world. `at-at` does not appear to be being maintained
-any longer. We've had a few minor issues with it (especially w/rt shutting down
-the system), and haven't gotten any responses from the maintainer on the github
-issues we've opened. Also, the source repo for `at-at` contains no tests :(
+The current implementation of the `SchedulerService` is a wrapper around
+the [`org.quartz-scheduler/quartz`](http://www.quartz-scheduler.org/) library.
### What's Next?
-* Add additional scheduling API functions, e.g. `every`.
+* Add additional scheduling API functions with more complicated recurring models.`.
* Add API for introspecting the state of currently scheduled jobs
-* Consider porting the backend to something more sophisticated (and maintained)
- than `at-at`; if we do this, the intent would be to maintain the existing API.
@@ -0,0 +1,136 @@
+# -*- Makefile -*-
+# This file was generated by the i18n leiningen plugin
+# Do not edit this file; it will be overwritten the next time you run
+# lein i18n init
+# The name of the package into which the translations bundle will be placed
+# The name of the POT file into which the gettext code strings (msgid) will be placed
+# The list of names of packages covered by the translation bundle;
+# by default it contains a single package - the same where the translations
+# bundle itself is placed - but this can be overridden - preferably in
+# the top level Makefile
+LOCALES=$(basename $(notdir $(wildcard locales/*.po)))
+BUNDLE_DIR=$(subst .,/,$(BUNDLE))
+BUNDLE_FILES=$(patsubst %,resources/$(BUNDLE_DIR)/Messages_%.class,$(LOCALES))
+FIND_SOURCES=find src -name \*.clj
+# xgettext before 0.19 does not understand --add-location=file. Even CentOS
+# 7 ships with an older gettext. We will therefore generate full location
+# info on those systems, and only file names where xgettext supports it
+LOC_OPT=$(shell xgettext --add-location=file -f - </dev/null >/dev/null 2>&1 && echo --add-location=file || echo --add-location)
+ :locales #{$(patsubst %,"%",$(LOCALES))}
+ :packages [$(patsubst %,"%",$(PACKAGES))]
+ :bundle $(patsubst %,"%",$(BUNDLE).Messages)
+i18n: msgfmt
+# Update locales/<project-name>.pot
+update-pot: locales/$(POT_NAME)
+locales/$(POT_NAME): $(shell $(FIND_SOURCES)) | locales
+ @tmp=$$(mktemp $@.tmp.XXXX); \
+ | xgettext --from-code=UTF-8 --language=lisp \
+ --copyright-holder='Puppet <docs at puppet.com>' \
+ --package-name="$(BUNDLE)" \
+ --package-version="$(BUNDLE_VERSION)" \
+ --msgid-bugs-address="docs at puppet.com" \
+ -k \
+ -kmark:1 -ki18n/mark:1 \
+ -ktrs:1 -ki18n/trs:1 \
+ -ktru:1 -ki18n/tru:1 \
+ -ktrun:1,2 -ki18n/trun:1,2 \
+ -ktrsn:1,2 -ki18n/trsn:1,2 \
+ $(LOC_OPT) \
+ --add-comments --sort-by-file \
+ -o $$tmp -f -; \
+ sed -i.bak -e 's/charset=CHARSET/charset=UTF-8/' $$tmp; \
+ sed -i.bak -e 's/POT-Creation-Date: [^\\]*/POT-Creation-Date: /' $$tmp; \
+ rm -f $$tmp.bak; \
+ if ! diff -q -I POT-Creation-Date $$tmp $@ >/dev/null 2>&1; then \
+ mv $$tmp $@; \
+ else \
+ rm $$tmp; touch $@; \
+ fi
+# Run msgfmt over all .po files to generate Java resource bundles
+# and create the locales.clj file
+msgfmt: $(BUNDLE_FILES) $(LOCALES_CLJ) clean-orphaned-bundles
+# Force rebuild of locales.clj if its contents is not the the desired one. The
+# shell echo is used to add a trailing newline to match the one from `cat`
+ifneq ($(shell cat $(LOCALES_CLJ) 2> /dev/null),$(shell echo '$(LOCALES_CLJ_CONTENTS)'))
+$(LOCALES_CLJ): | resources
+ @echo "Writing $@"
+ @echo "$$LOCALES_CLJ_CONTENTS" > $@
+# Remove every resource bundle that wasn't generated from a PO file.
+# We do this because we used to generate the english bundle directly from the POT.
+.PHONY: clean-orphaned-bundles
+ @for bundle in resources/$(BUNDLE_DIR)/Messages_*.class; do \
+ locale=$$(basename "$$bundle" | sed -E -e 's/\$$?1?\.class$$/_class/' | cut -d '_' -f 2;); \
+ if [ ! -f "locales/$$locale.po" ]; then \
+ rm "$$bundle"; \
+ fi \
+ done
+resources/$(BUNDLE_DIR)/Messages_%.class: locales/%.po | resources
+ msgfmt --java2 -d resources -r $(BUNDLE).Messages -l $(*F) $<
+# Use this to initialize translations. Updating the PO files is done
+# automatically through a CI job that utilizes the scripts in the project's
+# `bin` file, which themselves come from the `clj-i18n` project.
+locales/%.po: | locales
+ @if [ ! -f $@ ]; then \
+ touch $@ && msginit --no-translator -l $(*F) -o $@ -i locales/$(POT_NAME); \
+ fi
+resources locales:
+ @mkdir $@
+ $(info $(HELP))
+ @echo
+.PHONY: help
+define HELP
+This Makefile assists in handling i18n related tasks during development. Files
+that need to be checked into source control are put into the locales/ directory.
+They are
+ locales/$(POT_NAME) - the POT file generated by 'make update-pot'
+ locales/$$LANG.po - the translations for $$LANG
+Only the $$LANG.po files should be edited manually; this is usually done by
+You can use the following targets:
+ i18n: refresh all the files in locales/ and recompile resources
+ update-pot: extract strings and update locales/$(POT_NAME)
+ locales/LANG.po: create translations for LANG
+ msgfmt: compile the translations into Java classes; this step is
+ needed to make translations available to the Clojure code
+ and produces Java class files in resources/
+# @todo lutter 2015-04-20: for projects that use libraries with their own
+# translation, we need to combine all their translations into one big po
+# file and then run msgfmt over that so that we only have to deal with one
+# resource bundle
@@ -1,3 +1,3 @@
-lein2 test
+lein test
@@ -0,0 +1,30 @@
+# Esperanto translations for puppetlabs.trapperkeeper_scheduler package.
+# Copyright (C) 2017 Puppet <docs at puppet.com>
+# This file is distributed under the same license as the puppetlabs.trapperkeeper_scheduler package.
+# Automatically generated, 2017.
+msgid ""
+msgstr ""
+"Project-Id-Version: puppetlabs.trapperkeeper_scheduler \n"
+"Report-Msgid-Bugs-To: docs at puppet.com\n"
+"POT-Creation-Date: \n"
+"PO-Revision-Date: \n"
+"Last-Translator: Automatically generated\n"
+"Language-Team: none\n"
+"Language: eo\n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+"Plural-Forms: nplurals=2; plural=(n != 1);\n"
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "scheduled job threw error"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Initializing Scheduler Service"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Shutting down Scheduler Service"
+msgstr ""
@@ -0,0 +1,68 @@
+# Copyright (C) YEAR Puppet <docs at puppet.com>
+# This file is distributed under the same license as the puppetlabs.trapperkeeper_scheduler package.
+#, fuzzy
+msgid ""
+msgstr ""
+"Project-Id-Version: puppetlabs.trapperkeeper_scheduler \n"
+"X-Git-Ref: ccf78ef0d6b688ce8fc24988277ddf04de9f8945\n"
+"Report-Msgid-Bugs-To: docs at puppet.com\n"
+"POT-Creation-Date: \n"
+"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
+"Last-Translator: FULL NAME <EMAIL at ADDRESS>\n"
+"Language-Team: LANGUAGE <LL at li.org>\n"
+"Language: \n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+#: src/puppetlabs/trapperkeeper/services/scheduler/job.clj
+msgid "Skipping execution of job {0} because of missed interval."
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/job.clj
+msgid "scheduled job threw error"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Scheduled function must be non-nil"
+msgstr ""
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failed to schedule job"
+msgstr ""
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure stopping job"
+msgstr ""
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure getting all jobs"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failed to shutdown schedule service in {0} seconds"
+msgstr ""
+#. this can occur if the interface is being used while the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure stopping all jobs"
+msgstr ""
+#. this can occur if the function is called when the scheduler is shutdown
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_core.clj
+msgid "Failure getting jobs in group"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Initializing Scheduler Service"
+msgstr ""
+#: src/puppetlabs/trapperkeeper/services/scheduler/scheduler_service.clj
+msgid "Shutting down Scheduler Service"
+msgstr ""
@@ -1,14 +1,16 @@
-(def tk-version "1.1.1")
-(def ks-version "1.0.0")
-(defproject puppetlabs/trapperkeeper-scheduler "0.1.0"
+(defproject puppetlabs/trapperkeeper-scheduler "1.1.3"
:description "Trapperkeeper Scheduler Service"
- :dependencies [[org.clojure/clojure "1.6.0"]
- [puppetlabs/trapperkeeper ~tk-version]
- [puppetlabs/kitchensink ~ks-version]
- [prismatic/schema "0.4.0"]
- [overtone/at-at "1.2.0"]]
+ :dependencies [[org.clojure/clojure]
+ [puppetlabs/trapperkeeper]
+ [puppetlabs/i18n]
+ [puppetlabs/kitchensink]
+ [org.quartz-scheduler/quartz "2.3.2" :exclusions [c3p0]]]
+ :min-lein-version "2.9.1"
+ :parent-project {:coords [puppetlabs/clj-parent "4.6.7"]
+ :inherit [:managed-dependencies]}
:pedantic? :abort
@@ -23,10 +25,10 @@
:sign-releases false}]]
:profiles {:dev {:source-paths ["dev"]
- :dependencies [[org.clojure/tools.namespace "0.2.4"]
- [puppetlabs/trapperkeeper ~tk-version :classifier "test" :scope "test"]
- [puppetlabs/kitchensink ~ks-version :classifier "test" :scope "test"]
- [spyscope "0.1.4" :exclusions [clj-time]]]
- :injections [(require 'spyscope.core)]}}
+ :dependencies [[puppetlabs/trapperkeeper :classifier "test" :scope "test"]
+ [puppetlabs/kitchensink :classifier "test" :scope "test"]]}}
+ :plugins [[lein-parent "0.3.7"]
+ [puppetlabs/i18n "0.8.0"]]
+ :aot [puppetlabs.trapperkeeper.services.scheduler.job]
:repl-options {:init-ns user})
@@ -19,6 +19,26 @@
group can be provided to associated jobs with each other to allow
them to be stopped together.")
+ (interval
+ [this n f]
+ [this n f group-id]
+ "Calls 'f' repeatedly with a delay of 'n' milliseconds between the
+ beginning of a given invocation and the beginning of the following
+ invocation. If an invocation executon time is longer than the interval,
+ the subsquent invocation is skipped.
+ Returns an identifier for the scheduled job. An optional
+ group-id can be provided to collect a set of jobs into one group to allow
+ them to be stopped together.")
+ (interval-after
+ [this initial-delay repeat-delay f]
+ [this initial-delay repeat-delay f group-id]
+ "Calls 'f' repeatedly with a delay of 'repeat-delay' milliseconds after the `initial-delay` in millseconds.
+ Returns an identifier for the scheduled job. An optional
+ group-id can be provided to collect a set of jobs into one group to allow
+ them to be stopped together.")
(stop-job [this job]
"Given an identifier of a scheduled job, stop its execution. If an
invocation of the job is currently executing, it will be allowed to
@@ -38,4 +58,12 @@
[this group-id]
"Return the number of jobs known to the scheduler service, or the number
of jobs known to the scheduler service by group id. A nil group-id
- will return the count of all jobs."))
+ will return the count of all jobs.")
+ (get-jobs
+ [this]
+ [this group-id]
+ "Return all the known job identifiers, or the job identifiers associated
+ with the given group."))
@@ -0,0 +1,74 @@
+(ns puppetlabs.trapperkeeper.services.scheduler.job
+ (:gen-class
+ :name puppetlabs.trapperkeeper.services.scheduler.job
+ :state state
+ :init init
+ :constructors {[] []}
+ :implements [org.quartz.StatefulJob org.quartz.InterruptableJob]
+ :prefix "-")
+ (:require [clojure.tools.logging :as log]
+ [puppetlabs.i18n.core :as i18n])
+ (:import (org.quartz JobExecutionContext JobDataMap JobExecutionException DateBuilder DateBuilder$IntervalUnit)
+ (java.util Date)))
+(defn -init
+ []
+ [[] (atom {})])
+(defn- recurring?
+ [options]
+ (or (contains? options :interval) (contains? options :interspaced)))
+(defn- calculate-next-execution-time
+ [context options]
+ (if (contains? options :interspaced)
+ (Date. ^Long (+ (System/currentTimeMillis) (:interspaced options)))
+ (.getFireTimeAfter (.getTrigger context) (Date.))))
+(defn- should-skip?
+ [context options]
+ (when (contains? options :interval)
+ (let [interval-ms (:interval options)
+ now-ms (System/currentTimeMillis)
+ scheduled-fire-time-ms (.getTime (.getScheduledFireTime context))]
+ ; if the scheduled execution time is an interval or more away, skip it.
+ (> now-ms (+ scheduled-fire-time-ms interval-ms)))))
+(defn -execute
+ [this ^JobExecutionContext context]
+ (try
+ (let [^JobDataMap merged (.getMergedJobDataMap context)
+ options (.get merged "jobData")
+ f (:job options)]
+ (swap! (.state this) into {:current-thread (Thread/currentThread)})
+ (if-not (should-skip? context options)
+ (f)
+ (log/info (i18n/trs "Skipping execution of job {0} because of missed interval." (.toString (.getKey (.getJobDetail context))))))
+ ; using quartz interval execution does not take into account the
+ ; execution time of the actual job. For interspaced jobs, this means
+ ; triggering the job after this one completes, for interval jobs,
+ ; this means fast-forwarding the execution time to the next logical
+ ; one
+ (when (recurring? options)
+ (let [scheduler (.getScheduler context)
+ oldTrigger (.getTrigger context)
+ future-date (calculate-next-execution-time context options)
+ trigger (-> (.getTriggerBuilder oldTrigger)
+ (.startAt future-date)
+ (.build))]
+ (.rescheduleJob scheduler (.getKey oldTrigger) trigger))))
+ (catch Throwable e
+ (log/error e (i18n/trs "scheduled job threw error"))
+ (let [new-exception (JobExecutionException. ^Throwable e)]
+ (.setUnscheduleFiringTrigger new-exception true)
+ (throw new-exception)))))
+(defn -interrupt
+ [this]
+ (when-let [thread (:current-thread @(.state this))]
+ (.interrupt thread)))
@@ -1,63 +1,169 @@
(ns puppetlabs.trapperkeeper.services.scheduler.scheduler-core
- (:require [overtone.at-at :as at-at]
- [clojure.tools.logging :as log]))
-(defn create-pool
- "Creates and returns a thread pool which can be used for scheduling jobs."
- []
- (at-at/mk-pool))
-(defn wrap-with-error-logging
- "Returns a function that will invoke 'f' inside a try/catch block. If an
- error occurs during execution of 'f', it will be logged and re-thrown."
- [f]
- (fn []
- (try
- (f)
- (catch Throwable t
- (log/error t "scheduled job threw error")
- (throw t)))))
+ (:require [clojure.tools.logging :as log]
+ [puppetlabs.i18n.core :as i18n]
+ [puppetlabs.kitchensink.core :as ks])
+ (:import (org.quartz.impl.matchers GroupMatcher)
+ (org.quartz.impl StdSchedulerFactory SchedulerRepository)
+ (org.quartz JobBuilder SimpleScheduleBuilder TriggerBuilder
+ Scheduler JobKey SchedulerException JobDataMap)
+ (org.quartz.utils Key)
+ (java.util Date UUID Properties)))
+(def shutdown-timeout-sec 30)
+(defn create-scheduler
+ "Creates and returns a scheduler with configured thread pool which
+ can be used for scheduling jobs."
+ [thread-count]
+ (let [config [["org.quartz.scheduler.skipUpdateCheck" "true"]
+ ["org.quartz.scheduler.instanceName" (.toString (UUID/randomUUID))]
+ ["org.quartz.threadPool.threadCount" (str thread-count)]]
+ props (.clone (System/getProperties))
+ _ (doseq [[k v] config]
+ (.setProperty props k v))
+ factory (StdSchedulerFactory. ^Properties props)
+ scheduler (.getScheduler factory)]
+ (.start scheduler)
+ scheduler))
+(defn build-executable-job
+ ([f job-name group-name] (build-executable-job f job-name group-name {}))
+ ([f job-name group-name options]
+ (when (nil? f)
+ (throw (IllegalArgumentException. ^String (i18n/trs "Scheduled function must be non-nil"))))
+ (let [jdm (JobDataMap.)
+ options (assoc options :job f)]
+ (.put jdm "jobData" options)
+ (-> (JobBuilder/newJob puppetlabs.trapperkeeper.services.scheduler.job)
+ (.withIdentity job-name group-name)
+ (.usingJobData jdm)
+ (.build)))))
(defn interspaced
- ; See docs on the service protocol,
- ; puppetlabs.enterprise.services.protocols.scheduler
- [n f pool]
- (let [job (wrap-with-error-logging f)]
- (-> (at-at/interspaced n job pool)
- :id ; return only the ID; do not leak the at-at RecurringJob instance
- )))
+ [n f ^Scheduler scheduler group-name]
+ (try
+ (let [job-name (Key/createUniqueName group-name)
+ job (build-executable-job f job-name group-name {:interspaced n})
+ schedule (SimpleScheduleBuilder/simpleSchedule)
+ trigger (-> (TriggerBuilder/newTrigger)
+ (.withSchedule schedule)
+ (.startNow)
+ (.build))]
+ (.scheduleJob scheduler job trigger)
+ (.getJobKey trigger))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/error e (i18n/trs "Failed to schedule job")))))
(defn after
- ; See docs on the service protocol,
- ; puppetlabs.enterprise.services.protocols.scheduler
- [n f pool]
- (let [job (wrap-with-error-logging f)]
- (-> (at-at/after n job pool)
- :id ; return only the ID; do not leak the at-at RecurringJob instance
- )))
+ [n f ^Scheduler scheduler group-name]
+ (try
+ (let [job-name (Key/createUniqueName group-name)
+ job (build-executable-job f job-name group-name)
+ future-date (Date. ^Long (+ (System/currentTimeMillis) n))
+ trigger (-> (TriggerBuilder/newTrigger)
+ (.startAt future-date)
+ (.build))]
+ (.scheduleJob scheduler job trigger)
+ (.getJobKey trigger))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/error e (i18n/trs "Failed to schedule job")))))
+(defn interval
+ [^Scheduler scheduler repeat-delay f group-name]
+ (try
+ (let [job-name (Key/createUniqueName group-name)
+ job (build-executable-job f job-name group-name {:interval repeat-delay})
+ schedule (-> (SimpleScheduleBuilder/simpleSchedule)
+ (.withIntervalInMilliseconds repeat-delay)
+ ; allow quartz to reschedule things outside "org.quartz.jobStore.misfireThreshold" using internal logic
+ ; this isn't sufficient for short interval jobs, so additional scheduling logic is included in the job itself
+ (.withMisfireHandlingInstructionNextWithRemainingCount)
+ (.repeatForever))
+ trigger (-> (TriggerBuilder/newTrigger)
+ (.withSchedule schedule)
+ (.startNow)
+ (.build))]
+ (.scheduleJob scheduler job trigger)
+ (.getJobKey trigger))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/error e (i18n/trs "Failed to schedule job")))))
+(defn interval-after
+ [^Scheduler scheduler initial-delay repeat-delay f group-name]
+ (try
+ (let [job-name (Key/createUniqueName group-name)
+ job (build-executable-job f job-name group-name {:interval repeat-delay})
+ schedule (-> (SimpleScheduleBuilder/simpleSchedule)
+ (.withIntervalInMilliseconds repeat-delay)
+ ; allow quartz to reschedule things outside "org.quartz.jobStore.misfireThreshold" using internal logic
+ ; this isn't sufficient for short interval jobs, so additional scheduling logic is included in the job itself
+ (.withMisfireHandlingInstructionNextWithRemainingCount)
+ (.repeatForever))
+ future-date (Date. ^Long (+ (System/currentTimeMillis) initial-delay))
+ trigger (-> (TriggerBuilder/newTrigger)
+ (.withSchedule schedule)
+ (.startAt future-date)
+ (.build))]
+ (.scheduleJob scheduler job trigger)
+ (.getJobKey trigger))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/error e (i18n/trs "Failed to schedule job")))))
(defn stop-job
- "Gracefully stops the job specified by 'id'."
- [id pool]
- (at-at/stop id pool))
+ "Returns true, if the job was deleted, and false if the job wasn't found."
+ [^JobKey id ^Scheduler scheduler]
+ (try
+ (.deleteJob scheduler id)
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/debug e (i18n/trs "Failure stopping job"))
+ false)))
+(defn get-all-jobs
+ [^Scheduler scheduler]
+ (try
+ (let [groups (seq (.getJobGroupNames scheduler))
+ extract-keys (fn [group-name] (seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-name))))]
+ (mapcat extract-keys groups))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/debug e (i18n/trs "Failure getting all jobs"))
+ [])))
(defn stop-all-jobs!
- "Stops all of the specified jobs."
- [jobs pool]
- (doseq [job jobs]
- (stop-job job pool))
- ; Shutdown at-at's thread pool. This is the only way to do it, which is
- ; unfortunate because it also resets the thread pool. It's possible to
- ; hack around this via ...
- ;
- ; (-> pool
- ; :pool-atom
- ; (deref)
- ; :thread-pool
- ; (.shutdown))
- ;
- ; ... but that is a horrible hack. I've opened an issue with at-at to add a
- ; function that can be called to just stop the pool and not also reset it -
- ; https://github.com/overtone/at-at/issues/13
- (at-at/stop-and-reset-pool! pool))
+ [^Scheduler scheduler]
+ (when-not (.isShutdown scheduler)
+ (try
+ (let [sr (SchedulerRepository/getInstance)
+ scheduler-name (.getSchedulerName scheduler)]
+ (doseq [job (get-all-jobs scheduler)]
+ (try
+ (.interrupt scheduler job)
+ (.deleteJob scheduler job)
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/debug e (i18n/trs "Failure stopping job")))))
+ (when (= :timeout (ks/with-timeout shutdown-timeout-sec :timeout (.shutdown scheduler true)))
+ (log/info (i18n/trs "Failed to shutdown schedule service in {0} seconds" shutdown-timeout-sec))
+ (.shutdown scheduler))
+ ; explicitly remove the scheduler from the registry to prevent leaks. This can happen if the
+ ; jobs don't terminate immediately
+ (.remove sr scheduler-name))
+ (catch SchedulerException e
+ ; this can occur if the interface is being used while the scheduler is shutdown
+ (log/debug e (i18n/trs "Failure stopping all jobs"))))))
+(defn get-jobs-in-group
+ [^Scheduler scheduler group-id]
+ (try
+ (seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-id)))
+ (catch SchedulerException e
+ ; this can occur if the function is called when the scheduler is shutdown
+ (log/debug e (i18n/trs "Failure getting jobs in group"))
+ [])))
@@ -2,54 +2,18 @@
(:require [puppetlabs.trapperkeeper.services :as tk]
[puppetlabs.trapperkeeper.services.protocols.scheduler :refer :all]
[puppetlabs.trapperkeeper.services.scheduler.scheduler-core :as core]
- [clojure.tools.logging :as log]))
+ [clojure.tools.logging :as log]
+ [puppetlabs.i18n.core :as i18n])
+ (:import (org.quartz SchedulerException)))
; Internal "helper" functions
-(defn get-pool
+(defn get-scheduler
(-> this
- :pool))
-(defn get-jobs
- [this]
- (-> this
- (tk/service-context)
- :jobs))
-(defn- enqueue-job!
- ([this id]
- (enqueue-job! this id {}))
- ([this id opts]
- (let [result (assoc opts :job id)]
- (swap! (get-jobs this) conj result)
- result)))
-(defn- dequeue-job!
- ([this job]
- (swap! (get-jobs this) disj job))
- ([this id keyword]
- (when-let [item (first (filter #(= id (get % keyword)) @(get-jobs this)))]
- (swap! (get-jobs this) disj item))))
-(defn- after-job
- "Jobs run with `after` only execute once, and when done need to be reomved from
- the scheduled jobs set. This wraps the job's function so that the job is removed
- correctly from the set when it completes (or fails)."
- [this after-id f]
- (fn []
- (try
- (f)
- (finally
- (dequeue-job! this after-id :after-id)))))
-(defn- jobs-by-group-id
- [this group-id]
- (if group-id
- (filter #(= group-id (:group-id %)) @(get-jobs this))
- @(get-jobs this)))
+ :scheduler))
(defn- create-maybe-stop-job-fn
"given a stop-job function, return function that when given a job returns
@@ -57,65 +21,84 @@
(fn [job]
{:job job
- :stopped? (stop-fn job)}))
+ :stopped? (stop-fn job)}))
+(def default-group-name "SCHEDULER_DEFAULT")
+(defn safe-group-id
+ [group-id]
+ (if (and (not (keyword? group-id)) (empty? group-id))
+ default-group-name
+ (str group-id)))
; Trapperkeeper service definition
(tk/defservice scheduler-service
- []
+ [[:ConfigService get-in-config]]
(init [this context]
- (log/debug "Initializing Scheduler Service")
- (let [pool (core/create-pool)]
- (assoc context :pool pool
- :jobs (atom #{})
- :after-id (atom 0))))
+ (log/info (i18n/trs "Initializing Scheduler Service"))
+ ;; the default in Quartz is 10 threads, so make that the default if it isn't specified
+ (let [scheduler (core/create-scheduler (get-in-config [:scheduler :thread-count] 10))]
+ (assoc context :scheduler scheduler)))
(stop [this context]
- (log/debug "Shutting down Scheduler Service")
- ; Stop any jobs that are still running
- (core/stop-all-jobs! @(:jobs context) (get-pool this))
- (log/debug "Scheduler Service shutdown complete.")
+ (log/info (i18n/trs "Shutting down Scheduler Service"))
+ (core/stop-all-jobs! (get-scheduler this))
+ (log/info "Scheduler Service shutdown complete.")
(interspaced [this n f]
- (interspaced this n f nil))
+ (interspaced this n f default-group-name))
(interspaced [this n f group-id]
- (let [id (core/interspaced n f (get-pool this))]
- (enqueue-job! this id {:group-id group-id})))
+ (core/interspaced n f (get-scheduler this) (safe-group-id group-id)))
(after [this n f]
- (after this n f nil))
+ (after this n f default-group-name))
(after [this n f group-id]
- ; use after-id to identify the job for the cleanup "after-job" wrapper
- (let [after-id (swap! (:after-id (tk/service-context this)) inc)
- ; wrap the job function in a function that will remove the job from the job set when it is done
- wrapped-fn (after-job this after-id f)
- id (core/after n wrapped-fn (get-pool this))]
- (enqueue-job! this id {:after-id after-id
- :group-id group-id})))
+ (core/after n f (get-scheduler this) (safe-group-id group-id)))
+ (interval [this n f]
+ (interval this n f default-group-name))
+ (interval [this n f group-id]
+ (core/interval (get-scheduler this) n f (safe-group-id group-id)))
+ (interval-after [this initial-delay repeat-delay f]
+ (interval-after this initial-delay repeat-delay f default-group-name))
+ (interval-after [this initial-delay repeat-delay f group-id]
+ (core/interval-after (get-scheduler this) initial-delay repeat-delay f (safe-group-id group-id)))
(stop-job [this job]
- (let [result (core/stop-job (:job job) (get-pool this))]
- (dequeue-job! this job)
- result))
+ (core/stop-job job (get-scheduler this)))
- (stop-jobs this nil))
+ (stop-jobs this default-group-name))
(stop-jobs [this group-id]
- (let [jobs-by-group (jobs-by-group-id this group-id)]
+ (let [jobs-by-group (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id))]
(reduce conj []
(create-maybe-stop-job-fn (partial stop-job this))
- jobs-by-group))))
+ jobs-by-group))))
+ (get-jobs
+ [this]
+ (core/get-all-jobs (get-scheduler this)))
+ (get-jobs
+ [this group-id]
+ (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id)))
(count-jobs [this]
- (count-jobs this nil))
+ (let [jobs (core/get-all-jobs (get-scheduler this))]
+ (count jobs)))
(count-jobs [this group-id]
- (count (jobs-by-group-id this group-id))))
+ (count (core/get-jobs-in-group (get-scheduler this) (safe-group-id group-id)))))
@@ -1,17 +1,19 @@
(ns puppetlabs.trapperkeeper.services.scheduler.scheduler-service-test
(:require [clojure.test :refer :all]
+ [puppetlabs.kitchensink.core :as ks]
[puppetlabs.trapperkeeper.testutils.bootstrap :refer :all]
[puppetlabs.trapperkeeper.services.scheduler.scheduler-service :refer :all]
[puppetlabs.trapperkeeper.services.protocols.scheduler :refer :all]
- [puppetlabs.trapperkeeper.app :as tk]
- [overtone.at-at :as at-at]))
+ [puppetlabs.trapperkeeper.services.scheduler.scheduler-core :as sc]
+ [puppetlabs.trapperkeeper.app :as tk])
+ (:import [java.util.concurrent TimeUnit CountDownLatch]))
(deftest ^:integration test-interspaced
(testing "without group-id"
(with-app-with-empty-config app [scheduler-service]
(testing "interspaced"
(let [service (tk/get-service app :SchedulerService)
- num-runs 3 ; let it run a few times, but not too many
+ num-runs 10 ; let it run a few times, but not too many
interval 300
p (promise)
counter (atom 0)
@@ -40,7 +42,10 @@
(stop-job service job-id))
(testing (str "Each delay should be at least " interval "ms")
- (is (every? (fn [delay] (>= delay interval)) @delays)))))))
+ (is (every? (fn [delay] (>= delay interval)) @delays)))
+ (testing "can schedule far in the future"
+ (interspaced service 21026149688 (constantly nil)))))))
(testing "with group-id"
(with-app-with-empty-config app [scheduler-service]
@@ -82,16 +87,18 @@
(testing "without group-id"
(with-app-with-empty-config app [scheduler-service]
(testing "after"
- (let [delay 100]
+ (let [delay 100
+ service (tk/get-service app :SchedulerService)]
(testing "should execute at least " delay " milliseconds in the future"
(let [completed (promise)
- job #(deliver completed (System/currentTimeMillis))
- service (tk/get-service app :SchedulerService)]
+ job #(deliver completed (System/currentTimeMillis))]
(let [schedule-time (System/currentTimeMillis)]
(after service delay job)
(let [execution-time (deref completed)
actual-delay (- execution-time schedule-time)]
- (is (>= actual-delay delay))))))))))
+ (is (>= actual-delay delay))))))
+ (testing "can schedule far in the future"
+ (after service 21026149688 (constantly nil)))))))
(testing "with group-id"
(with-app-with-empty-config app [scheduler-service]
@@ -107,12 +114,6 @@
actual-delay (- execution-time schedule-time)]
(is (>= actual-delay delay)))))))))))
-; This test has a race condition, but it is very unlikley to occur in reality,
-; and so far it's actually been impossible to get this test to fail due to a
-; lost race. 'stop-job' is probably impossible to test deterministically.
-; It was decided that having a test with a race condition was better than no test
-; at all in this case, primarily due to the fact that the underlying scheduler
-; library (at-at) has no tests of its own. :(
(deftest ^:integration test-stop-job
(testing "without group-id"
(testing "stop-job lets a job complete but does not run it again"
@@ -141,7 +142,8 @@
(Thread/sleep 100)
(is (= original-start-time @start-time)))
(testing "there should be no other jobs running"
- (is (= 0 (count @(get-jobs service))))))))))
+ (is (= 0 (count-jobs service)))))))))
(testing "with group-id"
(testing "stop-job lets a job complete but does not run it again"
(with-app-with-empty-config app [scheduler-service]
@@ -169,11 +171,16 @@
(Thread/sleep 100)
(is (= original-start-time @start-time)))
(testing "there should be no other jobs running"
- (is (= 0 (count @(get-jobs service)))))))))))
+ (is (= 0 (count-jobs service))))))))))
(defn guaranteed-start-interval-job
([service interval]
- (guaranteed-start-interval-job service interval nil))
+ (let [started (promise)
+ job (fn []
+ (deliver started nil))
+ result (interspaced service interval job)]
+ (deref started)
+ result))
([service interval group-id]
(let [started (promise)
@@ -192,15 +199,15 @@
job-0 (guaranteed-start-interval-job service interval)]
(is (= 1 (count-jobs service)))
(let [job-1 (guaranteed-start-interval-job service interval)]
- (is (= 2 (count-jobs service))
+ (is (= 2 (count-jobs service)))
(let [job-2 (guaranteed-start-interval-job service interval)]
- (is (= 3 (count-jobs service)))
- (stop-job service job-0)
- (is (= 2 (count-jobs service)))
- (stop-job service job-1)
- (is (= 1 (count-jobs service)))
- (stop-job service job-2)
- (is (= 0 (count-jobs service)))))))))
+ (is (= 3 (count-jobs service)))
+ (stop-job service job-0)
+ (is (= 2 (count-jobs service)))
+ (stop-job service job-1)
+ (is (= 1 (count-jobs service)))
+ (stop-job service job-2)
+ (is (= 0 (count-jobs service))))))))
(testing "count-jobs shows correct number of group-id and non-group-id jobs"
(with-app-with-empty-config app [scheduler-service]
@@ -215,27 +222,27 @@
(let [job-1 (guaranteed-start-interval-job service interval)]
(is (= 3 (count-jobs service)))
(is (= 1 (count-jobs service group-id)))
- (let [group-id-job-1 (guaranteed-start-interval-job service interval group-id)]
+ (let [group-id-job-1 (guaranteed-start-interval-job service interval group-id)]
+ (is (= 4 (count-jobs service)))
+ (is (= 2 (count-jobs service group-id)))
+ (let [job-2 (guaranteed-start-interval-job service interval)]
+ (is (= 5 (count-jobs service)))
+ (is (= 2 (count-jobs service group-id)))
+ (stop-job service job-0)
(is (= 4 (count-jobs service)))
(is (= 2 (count-jobs service group-id)))
- (let [job-2 (guaranteed-start-interval-job service interval)]
- (is (= 5 (count-jobs service)))
- (is (= 2 (count-jobs service group-id)))
- (stop-job service job-0)
- (is (= 4 (count-jobs service)))
- (is (= 2 (count-jobs service group-id)))
- (stop-job service group-id-job-0)
- (is (= 3 (count-jobs service)))
- (is (= 1 (count-jobs service group-id)))
- (stop-job service job-1)
- (is (= 2 (count-jobs service)))
- (is (= 1 (count-jobs service group-id)))
- (stop-job service group-id-job-1)
- (is (= 1 (count-jobs service)))
- (is (= 0 (count-jobs service group-id)))
- (stop-job service job-2)
- (is (= 0 (count-jobs service)))
- (is (= 0 (count-jobs service group-id))))))))))
+ (stop-job service group-id-job-0)
+ (is (= 3 (count-jobs service)))
+ (is (= 1 (count-jobs service group-id)))
+ (stop-job service job-1)
+ (is (= 2 (count-jobs service)))
+ (is (= 1 (count-jobs service group-id)))
+ (stop-job service group-id-job-1)
+ (is (= 1 (count-jobs service)))
+ (is (= 0 (count-jobs service group-id)))
+ (stop-job service job-2)
+ (is (= 0 (count-jobs service)))
+ (is (= 0 (count-jobs service group-id))))))))))
(testing "after reduces count when complete"
(with-app-with-empty-config app [scheduler-service]
@@ -292,34 +299,356 @@
(is (= 1 (count-jobs service group-id-1))))
(testing "stopping by group id stops the job"
- (stop-jobs service group-id-1))
+ (stop-jobs service group-id-1)
(is (= 0 (count-jobs service)))
(is (= 0 (count-jobs service group-id-0)))
- (is (= 0 (count-jobs service group-id-1)))))))
+ (is (= 0 (count-jobs service group-id-1))))))))
-(defn schedule-random-jobs
- "Schedules several random jobs and returns their at/at IDs."
+(defn schedule-random-interspaced-jobs
+ "Schedules several random jobs and returns their JobKeys."
- (for [x [1 2 3]]
- (:job (interspaced service 1000 (constantly x))))))
-; In the future, it might be reasonable to add a function like this into the
-; scheduler service protocol. If so, this function can be deleted.
-(defn at-at-scheduled-jobs
- "Returns all of at-at's scheduled jobs."
- [service]
- (set (map :id (at-at/scheduled-jobs (get-pool service)))))
+ (for [x [1 2 3]]
+ (interspaced service 1000 (constantly x)))))
(deftest ^:integration test-shutdown
(testing "Any remaining jobs will be stopped when the service is stopped."
(let [app (bootstrap-services-with-empty-config [scheduler-service])
service (tk/get-service app :SchedulerService)
- job-ids (schedule-random-jobs service)]
+ job-ids (schedule-random-interspaced-jobs service)]
- (testing "at-at reports all of the jobs we just scheduled"
- (is (= job-ids (at-at-scheduled-jobs service))))
+ (testing "reports all of the jobs we just scheduled"
+ (is (= (set job-ids) (set (get-jobs service)))))
(testing "Stopping the service stops all of the scheduled jobs"
(tk/stop app)
- (is (= #{} (at-at-scheduled-jobs service)))))))
+ (is (empty? (get-jobs service))))))
+ (testing "Shutdown honors timeout and interrupts existing jobs"
+ ; redefine the default timeout so we don't have to wait forever
+ (with-redefs [sc/shutdown-timeout-sec 5]
+ (let [app (bootstrap-services-with-empty-config [scheduler-service])
+ service (tk/get-service app :SchedulerService)
+ is-test-done (promise)
+ job-done (promise)
+ ; run a job that waits on a promise that isn't ever delivered, but is interrupted
+ job (interspaced service 1000 (fn []
+ (try
+ (deref is-test-done)
+ (catch InterruptedException _
+ (deliver job-done true)))))]
+ (testing "job was correctly scheduled"
+ (is (= (set [job]) (set (get-jobs service)))))
+ (testing "Stopping the service does not block forever"
+ (is (not= :timeout (ks/with-timeout 10 :timeout (tk/stop app))))
+ (is (empty? (get-jobs service))))
+ (deref job-done)))))
+(defn distances
+ "Calculate the distances between each item in a sequence."
+ [v]
+ (map #(- %2 %1) v (rest v)))
+;; define some acceptable bounded accuracy ratings.
+;; as the jvm warms up, the accuracy increases
+(def accuracy-high 150)
+(def accuracy-low -50)
+(deftest ^:integration test-interval
+ (let [num-runs 10] ; let it run a few times, but not too many
+ (testing "when recurring is > wait-time"
+ (doseq [[recurring-delay wait-time] [[500 100] [250 100] [233 200]]]
+ (let [expected-delta (- recurring-delay wait-time)]
+ (testing (str "testing recurring-delay " recurring-delay " wait time " wait-time)
+ (testing "subsequent delays are observed"
+ (with-app-with-empty-config app [scheduler-service]
+ (let [service (tk/get-service app :SchedulerService)
+ counter (atom 0)
+ stop-job-promise (promise)
+ delays (atom [])
+ start-times (atom [])
+ last-completion-time (atom nil)
+ job (fn []
+ (let [local-counter (swap! counter inc)
+ start-time (System/currentTimeMillis)]
+ (when @last-completion-time
+ (let [delay (- start-time @last-completion-time)]
+ (swap! delays conj delay)))
+ (swap! start-times conj start-time)
+ (Thread/sleep wait-time)
+ ; The test is over!
+ (when (= local-counter num-runs)
+ (deliver stop-job-promise nil))
+ (reset! last-completion-time (System/currentTimeMillis))))
+ job-start-time (System/currentTimeMillis)
+ job-id (interval service recurring-delay job)]
+ (deref stop-job-promise)
+ (stop-job service job-id)
+ ; all the jobs should be stopped
+ (is (= 0 (count-jobs service)))
+ (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+ ; time between executions - expected time between executions should be in accuracy range
+ (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+ ; time between starting points of recurring task
+ (is (every? (fn [difference] (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times)))))))))))
+ (testing "when recurring < wait-time"
+ (doseq [[recurring-delay wait-time expected-delta] [[100 333 67] [100 250 50] [100 2330 70]]]
+ (testing (str "testing recurring-delay " recurring-delay " wait time " wait-time)
+ (testing "subsequent delays are observed"
+ (with-app-with-empty-config app [scheduler-service]
+ (let [service (tk/get-service app :SchedulerService)
+ counter (atom 0)
+ stop-job-promise (promise)
+ delays (atom [])
+ start-times (atom [])
+ last-completion-time (atom nil)
+ job (fn []
+ (let [local-counter (swap! counter inc)
+ start-time (System/currentTimeMillis)]
+ (when @last-completion-time
+ (let [delay (- start-time @last-completion-time)]
+ (swap! delays conj delay)))
+ (swap! start-times conj start-time)
+ (Thread/sleep wait-time)
+ ; The test is over!
+ (when (= local-counter num-runs)
+ (deliver stop-job-promise nil))
+ (reset! last-completion-time (System/currentTimeMillis))))
+ job-id (interval service recurring-delay job)]
+ (deref stop-job-promise)
+ (stop-job service job-id)
+ ; all the jobs should be stopped
+ (is (= 0 (count-jobs service)))
+ (testing (str "Each delay should be at less than " expected-delta "ms (within accuracy bounds)")
+ ; time between executions - expected time between executions should be in accuracy range
+ (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+ ; time between starting points of recurring task
+ (is (every? (fn [difference] (< accuracy-low (- (+ wait-time expected-delta) difference) accuracy-high)) (distances @start-times))))))))))))
+(deftest ^:integration test-interval-after
+ (let [num-runs 10]
+ (doseq [[initial-delay recurring-delay wait-time] [[333 500 100] [1000 250 100] [20 233 200]]]
+ (let [expected-delta (- recurring-delay wait-time)]
+ (testing (str "testing initial-delay " initial-delay "recurring-delay " recurring-delay " wait time " wait-time)
+ (testing "initial delay is correctly observed and subsequent delays are observed"
+ (with-app-with-empty-config app [scheduler-service]
+ (let [service (tk/get-service app :SchedulerService)
+ counter (atom 0)
+ stop-job-promise (promise)
+ first-completed-timestamp (promise)
+ delays (atom [])
+ start-times (atom [])
+ last-completion-time (atom nil)
+ job (fn []
+ (let [local-counter (swap! counter inc)
+ start-time (System/currentTimeMillis)]
+ (when (= 1 local-counter)
+ (deliver first-completed-timestamp start-time))
+ (when @last-completion-time
+ (let [delay (- start-time @last-completion-time)]
+ (swap! delays conj delay)))
+ (swap! start-times conj start-time)
+ (Thread/sleep wait-time)
+ ; The test is over!
+ (when (= local-counter num-runs)
+ (deliver stop-job-promise nil))
+ (reset! last-completion-time (System/currentTimeMillis))))
+ job-start-time (System/currentTimeMillis)
+ job-id (interval-after service initial-delay recurring-delay job)
+ result (deref first-completed-timestamp)]
+ (is (<= initial-delay (- result job-start-time)))
+ (deref stop-job-promise)
+ (stop-job service job-id)
+ ; all the jobs should be stopped
+ (is (= 0 (count-jobs service)))
+ (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+ ; time between executions - expected time between executions should be in accuracy range
+ (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+ ; time between starting points of recurring task
+ (is (every? (fn [difference] (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times)))))))
+ (testing "initial delay is correctly observed and subsequent delays are observed with group"
+ (with-app-with-empty-config app [scheduler-service]
+ (let [service (tk/get-service app :SchedulerService)
+ counter (atom 0)
+ stop-job-promise (promise)
+ first-completed-timestamp (promise)
+ delays (atom [])
+ start-times (atom [])
+ last-completion-time (atom nil)
+ group-id :unique-group-id
+ job (fn []
+ (let [local-counter (swap! counter inc)
+ start-time (System/currentTimeMillis)]
+ (when (= 1 local-counter)
+ (deliver first-completed-timestamp start-time))
+ (when @last-completion-time
+ (let [delay (- start-time @last-completion-time)]
+ (swap! delays conj delay)))
+ (swap! start-times conj start-time)
+ (Thread/sleep wait-time)
+ ; The test is over!
+ (when (= local-counter num-runs)
+ (deliver stop-job-promise nil))
+ (reset! last-completion-time (System/currentTimeMillis))))
+ job-start-time (System/currentTimeMillis)
+ job-id (interval-after service initial-delay recurring-delay job group-id)
+ result (deref first-completed-timestamp)]
+ (is (<= initial-delay (- result job-start-time)))
+ (deref stop-job-promise)
+ (stop-jobs service group-id)
+ ;; all the jobs should be stopped
+ (is (= 0 (count-jobs service)))
+ (testing (str "Each delay should be at less than " wait-time "ms (within accuracy bounds)")
+ ; time to recur - time between executions - expected time between executions
+ (is (every? (fn [delay] (< accuracy-low (- delay expected-delta) accuracy-high)) @delays))
+ ; time between starting points of recurring task
+ (is (every? (fn [difference] (< accuracy-low (- recurring-delay difference) accuracy-high)) (distances @start-times))))))))))))
+(deftest ^:integration test-thread-starvation
+ ; override the default to set the number of thread-pool threads
+ (let [initial-thread-count 1]
+ (with-app-with-config app [scheduler-service] {:scheduler {:thread-count initial-thread-count}}
+ (let [service (tk/get-service app :SchedulerService)]
+ (testing "after"
+ (testing "all jobs execute, even if delayed"
+ (let [num-jobs 20
+ delay 100
+ execution-latch (CountDownLatch. num-jobs)
+ wait-time 500
+ release-the-hounds (promise)
+ count (atom 0)
+ start-times (atom [])
+ job (fn []
+ (deref release-the-hounds)
+ (swap! start-times conj (System/currentTimeMillis))
+ (Thread/sleep wait-time)
+ (swap! count inc)
+ (.countDown execution-latch))]
+ (doseq [job-index (range 0 num-jobs)]
+ (after service delay job))
+ ;; allow the jobs to run once they are all set up
+ (deliver release-the-hounds true)
+ (is (.await execution-latch 20 TimeUnit/SECONDS))
+ (is (= num-jobs @count))
+ (is (every? (fn [difference] (<= wait-time difference (+ wait-time accuracy-high))) (distances @start-times))))))
+ (testing "interspaced"
+ (testing "all jobs execute at least once before they run again"
+ ;; this demonstrates that previously ready jobs are favored over rescheduled jobs
+ (let [num-jobs 20
+ frequency 100
+ execution-latch (CountDownLatch. (* 3 num-jobs))
+ wait-time 250
+ release-the-hounds (promise)
+ run-count (atom 0)
+ executed-jobs (atom [])
+ create-identified-job (fn [id]
+ (fn []
+ (deref release-the-hounds)
+ (swap! executed-jobs conj id)
+ (Thread/sleep wait-time)
+ (swap! run-count inc)
+ (.countDown execution-latch)))]
+ (doseq [job-index (range 0 num-jobs)]
+ (interspaced service frequency (create-identified-job job-index)))
+ ;; allow the jobs to run once they are all set up
+ (deliver release-the-hounds true)
+ (is (.await execution-latch 20 TimeUnit/SECONDS))
+ ;; first 20 should be unique
+ (is (= 20 (count (distinct (take 20 @executed-jobs)))))
+ ;; second 20 should be unique
+ (is (= 20 (count (distinct (take 20 (drop 20 @executed-jobs))))))
+ ;; third 20 should be unique
+ (is (= 20 (count (distinct (take 20 (drop 40 @executed-jobs))))))
+ (stop-jobs service))))
+ (testing "interval"
+ (testing "interval jobs can starve other jobs"
+ ;; run 20 interval jobs starting about the same time (incremental times), with a wait time longer than the execution period
+ ;; with only one thread only one job should run successfully every time, the others should be starved out
+ (let [num-jobs 20
+ frequency 100
+ execution-latch (CountDownLatch. (* 3 num-jobs))
+ wait-time 250
+ release-the-hounds (promise)
+ run-count (atom 0)
+ executed-jobs (atom [])
+ create-identified-job (fn [id]
+ (fn []
+ (deref release-the-hounds)
+ (swap! executed-jobs conj id)
+ (Thread/sleep wait-time)
+ (swap! run-count inc)
+ (.countDown execution-latch)))]
+ (doseq [job-index (range 0 num-jobs)]
+ (interval service frequency (create-identified-job job-index)))
+ ;; allow the jobs to run once they are all set up
+ (deliver release-the-hounds true)
+ (is (.await execution-latch 20 TimeUnit/SECONDS))
+ (is (= 1 (count (distinct @executed-jobs))))
+ (stop-jobs service))))
+ (testing "Interval job correctly skips execution if thread is unavailable"
+ (let [num-jobs 3
+ execution-count (atom 0)
+ start-times (atom [])
+ stop-test (promise)
+ ; interval should run immediately, and then every 100 ms
+ interval-frequency 100
+ ; wait 50ms for the after
+ after-start-time 50
+ after-sleep-time 500
+ ; calculate the next logical interval time
+ effective-first-time (* (quot (+ after-start-time after-sleep-time interval-frequency) interval-frequency) interval-frequency)
+ interval-job (fn []
+ (swap! start-times conj (System/currentTimeMillis))
+ (if (= num-jobs (swap! execution-count inc))
+ (deliver stop-test true)))
+ after-job (fn []
+ (Thread/sleep after-sleep-time))]
+ (interval service interval-frequency interval-job)
+ (after service after-start-time after-job)
+ ; wait for the right number of interval jobs to execute
+ (deref stop-test)
+ (stop-jobs service)
+ ; the distance between the first two should be next interval of after-start-time + after-sleep time
+ (is (<= (+ effective-first-time accuracy-low) (first (distances @start-times)) (+ effective-first-time accuracy-high)))
+ ; the next run should be about the interval frequency
+ (is (<= (+ interval-frequency accuracy-low) (nth (distances @start-times) 1) (+ interval-frequency accuracy-high)))))))))
+(deftest ^:integration null-handling
+ (with-app-with-empty-config app [scheduler-service]
+ (let [service (tk/get-service app :SchedulerService)]
+ (testing "interspaced throws exception on nil function"
+ (is (thrown? IllegalArgumentException
+ (interspaced service 300 nil))))
+ (testing "after throws exception on nil function"
+ (is (thrown? IllegalArgumentException
+ (after service 300 nil))))
+ (testing "interval throws exception on nil function"
+ (is (thrown? IllegalArgumentException
+ (interval service 300 nil))))
+ (testing "interval-after throws exception on nil function"
+ (is (thrown? IllegalArgumentException
+ (interval-after service 1 300 nil)))))))
test/unit/puppetlabs/trapperkeeper/services/scheduler/scheduler_core_test.clj deleted
@@ -1,16 +0,0 @@
-(ns puppetlabs.trapperkeeper.services.scheduler.scheduler-core-test
- (:require [clojure.test :refer :all]
- [puppetlabs.trapperkeeper.services.scheduler.scheduler-core :refer :all]
- [puppetlabs.trapperkeeper.testutils.logging :refer :all]
- [schema.test :as schema-test]))
-(use-fixtures :once schema-test/validate-schemas)
-(deftest wrap-with-error-logging-test
- (testing "when a job throws an exception, it is logged and re-thrown"
- (let [f #(throw (Exception. "bummer"))]
- (with-test-logging
- (is (thrown-with-msg? Exception
- #"bummer"
- ((wrap-with-error-logging f))))
- (is (logged? #"scheduled job threw error" :error))))))
View it on GitLab: https://salsa.debian.org/java-team/trapperkeeper-scheduler-clojure/-/commit/3dacb894100003b87ef4de6eb8e0546f19832fcd
View it on GitLab: https://salsa.debian.org/java-team/trapperkeeper-scheduler-clojure/-/commit/3dacb894100003b87ef4de6eb8e0546f19832fcd
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20201230/042f91a9/attachment.html>
More information about the pkg-java-commits
mailing list