[Python-modules-commits] [django-background-tasks] 01/03: New upstream version 1.1.11
Hans-Christoph Steiner
eighthave at moszumanska.debian.org
Tue Sep 26 11:47:18 UTC 2017
This is an automated email from the git hooks/post-receive script.
eighthave pushed a commit to branch master
in repository django-background-tasks.
commit e683565d2a908fb5ad2c12be1302e16053c3b553
Author: Hans-Christoph Steiner <hans at eds.org>
Date: Mon Sep 25 22:09:47 2017 +0200
New upstream version 1.1.11
---
AUTHORS.txt | 17 +
LICENSE | 28 +
MANIFEST.in | 6 +
PKG-INFO | 47 ++
README.md | 22 +
background_task/__init__.py | 8 +
background_task/admin.py | 19 +
background_task/apps.py | 10 +
background_task/exceptions.py | 8 +
background_task/management/__init__.py | 0
background_task/management/commands/__init__.py | 0
.../management/commands/process_tasks.py | 101 +++
background_task/migrations/0001_initial.py | 65 ++
background_task/migrations/__init__.py | 0
background_task/models.py | 316 ++++++++
background_task/models_completed.py | 124 ++++
background_task/settings.py | 61 ++
background_task/signals.py | 34 +
background_task/tasks.py | 313 ++++++++
background_task/tests/__init__.py | 0
background_task/tests/test_settings.py | 47 ++
background_task/tests/test_settings_async.py | 4 +
background_task/tests/test_tasks.py | 825 +++++++++++++++++++++
background_task/utils.py | 31 +
classifiers | 15 +
django_background_tasks.egg-info/PKG-INFO | 47 ++
django_background_tasks.egg-info/SOURCES.txt | 32 +
.../dependency_links.txt | 1 +
django_background_tasks.egg-info/requires.txt | 2 +
django_background_tasks.egg-info/top_level.txt | 1 +
django_background_tasks.egg-info/zip-safe | 1 +
requirements.txt | 2 +
setup.cfg | 5 +
setup.py | 22 +
34 files changed, 2214 insertions(+)
diff --git a/AUTHORS.txt b/AUTHORS.txt
new file mode 100644
index 0000000..15cb1bf
--- /dev/null
+++ b/AUTHORS.txt
@@ -0,0 +1,17 @@
+Contributors
+
+* John Montgomery (lilspikey & johnsensible, initiator)
+* Yannik Ammann (yannik-ammann)
+* Luthaf (luthaf)
+* Philippe O. Wagner (philippeowagner)
+* weijia (weijia)
+* tdruez (tdruez)
+* Chad G. Hansen (chadgh)
+* Grant McConnaughey (grantmcconnaughey)
+* James Mason (bear454)
+* Pavel Zagrebelin (Zagrebelin)
+* Stephen Brown (december1981)
+* Adam Johnson (adamchainz)
+* (kherrett)
+
+Your name could stand here :)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..b4a83e1
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,28 @@
+Copyright (c) 2015, arteria GmbH.
+Copyright (c) 2010, John Montgomery.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ 3. Neither the name of Django Bakground Task nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..8ca8c24
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,6 @@
+include README.md
+include LICENSE
+include AUTHORS.txt
+include requirements.txt
+include classifiers
+recursive-include tests *
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..831f0c2
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,47 @@
+Metadata-Version: 1.1
+Name: django-background-tasks
+Version: 1.1.11
+Summary: Database backed asynchronous task queue
+Home-page: http://github.com/arteria/django-background-tasks
+Author: arteria GmbH, John Montgomery
+Author-email: admin at arteria.ch
+License: BSD
+Description: # Django Background Tasks
+
+ [](https://travis-ci.org/arteria/django-background-tasks)
+ [](https://coveralls.io/github/arteria/django-background-tasks?branch=master)
+ [](http://django-background-tasks.readthedocs.io/en/latest/?badge=latest)
+ [](https://pypi.python.org/pypi/django-background-tasks)
+
+
+ Django Background Task is a databased-backed work queue for Django, loosely based around [Ruby's DelayedJob](https://github.com/tobi/delayed_job) library. This project was adopted and adapted from [lilspikey](https://github.com/lilspikey/) django-background-task.
+
+ To avoid conflicts on PyPI we renamed it to django-background-tasks (plural). For an easy upgrade from django-background-task to django-background-tasks, the internal module structure were left untouched.
+
+ In Django Background Task, all tasks are implemented as functions (or any other callable).
+
+ There are two parts to using background tasks:
+
+ * creating the task functions and registering them with the scheduler
+ * setup a cron task (or long running process) to execute the tasks
+
+
+ ## Docs
+ See http://django-background-tasks.readthedocs.io/en/latest/.
+
+Platform: UNKNOWN
+Classifier: Development Status :: 5 - Production/Stable
+Classifier: Environment :: Web Environment
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Framework :: Django
+Classifier: Framework :: Django :: 1.4
+Classifier: Framework :: Django :: 1.5
+Classifier: Framework :: Django :: 1.6
+Classifier: Framework :: Django :: 1.7
+Classifier: Framework :: Django :: 1.8
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e9d9c47
--- /dev/null
+++ b/README.md
@@ -0,0 +1,22 @@
+# Django Background Tasks
+
+[](https://travis-ci.org/arteria/django-background-tasks)
+[](https://coveralls.io/github/arteria/django-background-tasks?branch=master)
+[](http://django-background-tasks.readthedocs.io/en/latest/?badge=latest)
+[](https://pypi.python.org/pypi/django-background-tasks)
+
+
+Django Background Task is a databased-backed work queue for Django, loosely based around [Ruby's DelayedJob](https://github.com/tobi/delayed_job) library. This project was adopted and adapted from [lilspikey](https://github.com/lilspikey/) django-background-task.
+
+To avoid conflicts on PyPI we renamed it to django-background-tasks (plural). For an easy upgrade from django-background-task to django-background-tasks, the internal module structure were left untouched.
+
+In Django Background Task, all tasks are implemented as functions (or any other callable).
+
+There are two parts to using background tasks:
+
+* creating the task functions and registering them with the scheduler
+* setup a cron task (or long running process) to execute the tasks
+
+
+## Docs
+See http://django-background-tasks.readthedocs.io/en/latest/.
diff --git a/background_task/__init__.py b/background_task/__init__.py
new file mode 100644
index 0000000..5263478
--- /dev/null
+++ b/background_task/__init__.py
@@ -0,0 +1,8 @@
+# -*- coding: utf-8 -*-
+__version__ = '1.1.11'
+
+default_app_config = 'background_task.apps.BackgroundTasksAppConfig'
+
+def background(*arg, **kw):
+ from background_task.tasks import tasks
+ return tasks.background(*arg, **kw)
diff --git a/background_task/admin.py b/background_task/admin.py
new file mode 100644
index 0000000..eccf998
--- /dev/null
+++ b/background_task/admin.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+from django.contrib import admin
+from background_task.models_completed import CompletedTask
+
+from background_task.models import Task
+
+
+class TaskAdmin(admin.ModelAdmin):
+ display_filter = ['task_name']
+ list_display = ['task_name', 'task_params', 'run_at', 'priority', 'attempts', 'has_error', 'locked_by', 'locked_by_pid_running', ]
+
+
+class CompletedTaskAdmin(admin.ModelAdmin):
+ display_filter = ['task_name']
+ list_display = ['task_name', 'task_params', 'run_at', 'priority', 'attempts', 'has_error', 'locked_by', 'locked_by_pid_running', ]
+
+
+admin.site.register(Task, TaskAdmin)
+admin.site.register(CompletedTask, CompletedTaskAdmin)
diff --git a/background_task/apps.py b/background_task/apps.py
new file mode 100644
index 0000000..d5f7a5d
--- /dev/null
+++ b/background_task/apps.py
@@ -0,0 +1,10 @@
+from django.apps import AppConfig
+
+
+class BackgroundTasksAppConfig(AppConfig):
+ name = 'background_task'
+ from background_task import __version__ as version_info
+ verbose_name = 'Background Tasks ({})'.format(version_info)
+
+ def ready(self):
+ import background_task.signals # noqa
diff --git a/background_task/exceptions.py b/background_task/exceptions.py
new file mode 100644
index 0000000..6ae0341
--- /dev/null
+++ b/background_task/exceptions.py
@@ -0,0 +1,8 @@
+# -*- coding: utf-8 -*-
+
+
+class BackgroundTaskError(Exception):
+
+ def __init__(self, message, errors=None):
+ super(Exception, self).__init__(message)
+ self.errors = errors
diff --git a/background_task/management/__init__.py b/background_task/management/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/management/commands/__init__.py b/background_task/management/commands/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/management/commands/process_tasks.py b/background_task/management/commands/process_tasks.py
new file mode 100644
index 0000000..c349388
--- /dev/null
+++ b/background_task/management/commands/process_tasks.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+import logging
+import random
+import sys
+import time
+
+from django import VERSION
+from django.core.management.base import BaseCommand
+
+from background_task.tasks import tasks, autodiscover
+from background_task.utils import SignalManager
+from compat import close_connection
+
+
+logger = logging.getLogger(__name__)
+
+
+def _configure_log_std():
+ class StdOutWrapper(object):
+ def write(self, s):
+ logger.info(s)
+
+ class StdErrWrapper(object):
+ def write(self, s):
+ logger.error(s)
+ sys.stdout = StdOutWrapper()
+ sys.stderr = StdErrWrapper()
+
+
+class Command(BaseCommand):
+ help = 'Run tasks that are scheduled to run on the queue'
+
+ # Command options are specified in an abstract way to enable Django < 1.8 compatibility
+ OPTIONS = (
+ (('--duration', ), {
+ 'action': 'store',
+ 'dest': 'duration',
+ 'type': int,
+ 'default': 0,
+ 'help': 'Run task for this many seconds (0 or less to run forever) - default is 0',
+ }),
+ (('--sleep', ), {
+ 'action': 'store',
+ 'dest': 'sleep',
+ 'type': float,
+ 'default': 5.0,
+ 'help': 'Sleep for this many seconds before checking for new tasks (if none were found) - default is 5',
+ }),
+ (('--queue', ), {
+ 'action': 'store',
+ 'dest': 'queue',
+ 'help': 'Only process tasks on this named queue',
+ }),
+ (('--log-std', ), {
+ 'action': 'store_true',
+ 'dest': 'log_std',
+ 'help': 'Redirect stdout and stderr to the logging system',
+ }),
+
+ )
+
+ if VERSION < (1, 8):
+ from optparse import make_option
+ option_list = BaseCommand.option_list + tuple([make_option(*args, **kwargs) for args, kwargs in OPTIONS])
+
+ # Used in Django >= 1.8
+ def add_arguments(self, parser):
+ for (args, kwargs) in self.OPTIONS:
+ parser.add_argument(*args, **kwargs)
+
+ def __init__(self, *args, **kwargs):
+ super(Command, self).__init__(*args, **kwargs)
+ self._tasks = tasks
+
+ def handle(self, *args, **options):
+ duration = options.pop('duration', 0)
+ sleep = options.pop('sleep', 5.0)
+ queue = options.pop('queue', None)
+ log_std = options.pop('log_std', False)
+ sig_manager = SignalManager()
+
+ if log_std:
+ _configure_log_std()
+
+ autodiscover()
+
+ start_time = time.time()
+
+ while (duration <= 0) or (time.time() - start_time) <= duration:
+ if sig_manager.kill_now:
+ # shutting down gracefully
+ break
+
+ if not self._tasks.run_next_task(queue):
+ # there were no tasks in the queue, let's recover.
+ close_connection()
+ logger.debug('waiting for tasks')
+ time.sleep(sleep)
+ else:
+ # there were some tasks to process, let's check if there is more work to do after a little break.
+ time.sleep(random.uniform(sig_manager.time_to_wait[0], sig_manager.time_to_wait[1]))
diff --git a/background_task/migrations/0001_initial.py b/background_task/migrations/0001_initial.py
new file mode 100644
index 0000000..b71cdc3
--- /dev/null
+++ b/background_task/migrations/0001_initial.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# Generated by Django 1.10.6 on 2017-04-03 21:42
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = [
+ ('contenttypes', '0002_remove_content_type_name'),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='CompletedTask',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('task_name', models.CharField(db_index=True, max_length=255)),
+ ('task_params', models.TextField()),
+ ('task_hash', models.CharField(db_index=True, max_length=40)),
+ ('verbose_name', models.CharField(blank=True, max_length=255, null=True)),
+ ('priority', models.IntegerField(db_index=True, default=0)),
+ ('run_at', models.DateTimeField(db_index=True)),
+ ('repeat', models.BigIntegerField(choices=[(3600, 'hourly'), (86400, 'daily'), (604800, 'weekly'), (1209600, 'every 2 weeks'), (2419200, 'every 4 weeks'), (0, 'never')], default=0)),
+ ('repeat_until', models.DateTimeField(blank=True, null=True)),
+ ('queue', models.CharField(blank=True, db_index=True, max_length=255, null=True)),
+ ('attempts', models.IntegerField(db_index=True, default=0)),
+ ('failed_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+ ('last_error', models.TextField(blank=True)),
+ ('locked_by', models.CharField(blank=True, db_index=True, max_length=64, null=True)),
+ ('locked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+ ('creator_object_id', models.PositiveIntegerField(blank=True, null=True)),
+ ('creator_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='completed_background_task', to='contenttypes.ContentType')),
+ ],
+ ),
+ migrations.CreateModel(
+ name='Task',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('task_name', models.CharField(db_index=True, max_length=255)),
+ ('task_params', models.TextField()),
+ ('task_hash', models.CharField(db_index=True, max_length=40)),
+ ('verbose_name', models.CharField(blank=True, max_length=255, null=True)),
+ ('priority', models.IntegerField(db_index=True, default=0)),
+ ('run_at', models.DateTimeField(db_index=True)),
+ ('repeat', models.BigIntegerField(choices=[(3600, 'hourly'), (86400, 'daily'), (604800, 'weekly'), (1209600, 'every 2 weeks'), (2419200, 'every 4 weeks'), (0, 'never')], default=0)),
+ ('repeat_until', models.DateTimeField(blank=True, null=True)),
+ ('queue', models.CharField(blank=True, db_index=True, max_length=255, null=True)),
+ ('attempts', models.IntegerField(db_index=True, default=0)),
+ ('failed_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+ ('last_error', models.TextField(blank=True)),
+ ('locked_by', models.CharField(blank=True, db_index=True, max_length=64, null=True)),
+ ('locked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+ ('creator_object_id', models.PositiveIntegerField(blank=True, null=True)),
+ ('creator_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='background_task', to='contenttypes.ContentType')),
+ ],
+ options={
+ 'db_table': 'background_task',
+ },
+ ),
+ ]
diff --git a/background_task/migrations/__init__.py b/background_task/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/models.py b/background_task/models.py
new file mode 100644
index 0000000..726d6ef
--- /dev/null
+++ b/background_task/models.py
@@ -0,0 +1,316 @@
+# -*- coding: utf-8 -*-
+from datetime import timedelta
+from hashlib import sha1
+import json
+import logging
+import os
+import traceback
+
+from compat import StringIO
+from compat.models import GenericForeignKey
+from django.contrib.contenttypes.models import ContentType
+from django.db import models
+from django.db.models import Q
+from django.utils import timezone
+from django.utils.six import python_2_unicode_compatible
+
+from background_task.settings import app_settings
+from background_task.signals import task_failed, task_rescheduled
+
+
+logger = logging.getLogger(__name__)
+
+
+# inspired by http://github.com/tobi/delayed_job
+#
+
+
+class TaskQuerySet(models.QuerySet):
+
+ def created_by(self, creator):
+ """
+ :return: A Task queryset filtered by creator
+ """
+ content_type = ContentType.objects.get_for_model(creator)
+ return self.filter(
+ creator_content_type=content_type,
+ creator_object_id=creator.id,
+ )
+
+
+class TaskManager(models.Manager):
+
+ def get_queryset(self):
+ return TaskQuerySet(self.model, using=self._db)
+
+ def created_by(self, creator):
+ return self.get_queryset().created_by(creator)
+
+ def find_available(self, queue=None):
+ now = timezone.now()
+ qs = self.unlocked(now)
+ if queue:
+ qs = qs.filter(queue=queue)
+ ready = qs.filter(run_at__lte=now, failed_at=None)
+ _priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
+ ready = ready.order_by(_priority_ordering, 'run_at')
+
+ if app_settings.BACKGROUND_TASK_RUN_ASYNC:
+ currently_locked = self.locked(now).count()
+ count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - currently_locked
+ if count > 0:
+ ready = ready[:count]
+ else:
+ ready = self.none()
+
+ return ready
+
+ def unlocked(self, now):
+ max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
+ qs = self.get_queryset()
+ expires_at = now - timedelta(seconds=max_run_time)
+ unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at)
+ return qs.filter(unlocked)
+
+ def locked(self, now):
+ max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
+ qs = self.get_queryset()
+ expires_at = now - timedelta(seconds=max_run_time)
+ locked = Q(locked_by__isnull=False) | Q(locked_at__gt=expires_at)
+ return qs.filter(locked)
+
+ def new_task(self, task_name, args=None, kwargs=None,
+ run_at=None, priority=0, queue=None, verbose_name=None, creator=None,
+ repeat=None, repeat_until=None):
+ args = args or ()
+ kwargs = kwargs or {}
+ if run_at is None:
+ run_at = timezone.now()
+
+ task_params = json.dumps((args, kwargs), sort_keys=True)
+ s = "%s%s" % (task_name, task_params)
+ task_hash = sha1(s.encode('utf-8')).hexdigest()
+ return Task(task_name=task_name,
+ task_params=task_params,
+ task_hash=task_hash,
+ priority=priority,
+ run_at=run_at,
+ queue=queue,
+ verbose_name=verbose_name,
+ creator=creator,
+ repeat=repeat or Task.NEVER,
+ repeat_until=repeat_until,
+ )
+
+ def get_task(self, task_name, args=None, kwargs=None):
+ args = args or ()
+ kwargs = kwargs or {}
+ task_params = json.dumps((args, kwargs), sort_keys=True)
+ s = "%s%s" % (task_name, task_params)
+ task_hash = sha1(s.encode('utf-8')).hexdigest()
+ qs = self.get_queryset()
+ return qs.filter(task_hash=task_hash)
+
+ def drop_task(self, task_name, args=None, kwargs=None):
+ return self.get_task(task_name, args, kwargs).delete()
+
+
+ at python_2_unicode_compatible
+class Task(models.Model):
+ # the "name" of the task/function to be run
+ task_name = models.CharField(max_length=255, db_index=True)
+ # the json encoded parameters to pass to the task
+ task_params = models.TextField()
+ # a sha1 hash of the name and params, to lookup already scheduled tasks
+ task_hash = models.CharField(max_length=40, db_index=True)
+
+ verbose_name = models.CharField(max_length=255, null=True, blank=True)
+
+ # what priority the task has
+ priority = models.IntegerField(default=0, db_index=True)
+ # when the task should be run
+ run_at = models.DateTimeField(db_index=True)
+
+ # Repeat choices are encoded as number of seconds
+ # The repeat implementation is based on this encoding
+ HOURLY = 3600
+ DAILY = 24 * HOURLY
+ WEEKLY = 7 * DAILY
+ EVERY_2_WEEKS = 2 * WEEKLY
+ EVERY_4_WEEKS = 4 * WEEKLY
+ NEVER = 0
+ REPEAT_CHOICES = (
+ (HOURLY, 'hourly'),
+ (DAILY, 'daily'),
+ (WEEKLY, 'weekly'),
+ (EVERY_2_WEEKS, 'every 2 weeks'),
+ (EVERY_4_WEEKS, 'every 4 weeks'),
+ (NEVER, 'never'),
+ )
+ repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
+ repeat_until = models.DateTimeField(null=True, blank=True)
+
+ # the "name" of the queue this is to be run on
+ queue = models.CharField(max_length=255, db_index=True,
+ null=True, blank=True)
+
+ # how many times the task has been tried
+ attempts = models.IntegerField(default=0, db_index=True)
+ # when the task last failed
+ failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
+ # details of the error that occurred
+ last_error = models.TextField(blank=True)
+
+ # details of who's trying to run the task at the moment
+ locked_by = models.CharField(max_length=64, db_index=True,
+ null=True, blank=True)
+ locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
+
+ creator_content_type = models.ForeignKey(
+ ContentType, null=True, blank=True,
+ related_name='background_task', on_delete=models.CASCADE
+ )
+ creator_object_id = models.PositiveIntegerField(null=True, blank=True)
+ creator = GenericForeignKey('creator_content_type', 'creator_object_id')
+
+ objects = TaskManager()
+
+ def locked_by_pid_running(self):
+ """
+ Check if the locked_by process is still running.
+ """
+ if self.locked_by:
+ try:
+ # won't kill the process. kill is a bad named system call
+ os.kill(int(self.locked_by), 0)
+ return True
+ except:
+ return False
+ else:
+ return None
+ locked_by_pid_running.boolean = True
+
+ def has_error(self):
+ """
+ Check if the last_error field is empty.
+ """
+ return bool(self.last_error)
+ has_error.boolean = True
+
+ def params(self):
+ args, kwargs = json.loads(self.task_params)
+ # need to coerce kwargs keys to str
+ kwargs = dict((str(k), v) for k, v in kwargs.items())
+ return args, kwargs
+
+ def lock(self, locked_by):
+ now = timezone.now()
+ unlocked = Task.objects.unlocked(now).filter(pk=self.pk)
+ updated = unlocked.update(locked_by=locked_by, locked_at=now)
+ if updated:
+ return Task.objects.get(pk=self.pk)
+ return None
+
+ def _extract_error(self, type, err, tb):
+ file = StringIO()
+ traceback.print_exception(type, err, tb, None, file)
+ return file.getvalue()
+
+ def increment_attempts(self):
+ self.attempts += 1
+ self.save()
+
+ def has_reached_max_attempts(self):
+ max_attempts = app_settings.BACKGROUND_TASK_MAX_ATTEMPTS
+ return self.attempts >= max_attempts
+
+ def is_repeating_task(self):
+ return self.repeat > self.NEVER
+
+ def reschedule(self, type, err, traceback):
+ '''
+ Set a new time to run the task in future, or create a CompletedTask and delete the Task
+ if it has reached the maximum of allowed attempts
+ '''
+ self.last_error = self._extract_error(type, err, traceback)
+ self.increment_attempts()
+ if self.has_reached_max_attempts():
+ self.failed_at = timezone.now()
+ logger.warning('Marking task %s as failed', self)
+ completed = self.create_completed_task()
+ task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed)
+ self.delete()
+ else:
+ backoff = timedelta(seconds=(self.attempts ** 4) + 5)
+ self.run_at = timezone.now() + backoff
+ logger.warning('Rescheduling task %s for %s later at %s', self,
+ backoff, self.run_at)
+ task_rescheduled.send(sender=self.__class__, task=self)
+ self.locked_by = None
+ self.locked_at = None
+ self.save()
+
+ def create_completed_task(self):
+ '''
+ Returns a new CompletedTask instance with the same values
+ '''
+ from background_task.models_completed import CompletedTask
+ completed_task = CompletedTask(
+ task_name=self.task_name,
+ task_params=self.task_params,
+ task_hash=self.task_hash,
+ priority=self.priority,
+ run_at=timezone.now(),
+ queue=self.queue,
+ attempts=self.attempts,
+ failed_at=self.failed_at,
+ last_error=self.last_error,
+ locked_by=self.locked_by,
+ locked_at=self.locked_at,
+ verbose_name=self.verbose_name,
+ creator=self.creator,
+ repeat=self.repeat,
+ repeat_until=self.repeat_until,
+ )
+ completed_task.save()
+ return completed_task
+
+ def create_repetition(self):
+ """
+ :return: A new Task with an offset of self.repeat, or None if the self.repeat_until is reached
+ """
+ if not self.is_repeating_task():
+ return None
+
+ if self.repeat_until and self.repeat_until <= timezone.now():
+ # Repeat chain completed
+ return None
+
+ args, kwargs = self.params()
+ new_run_at = self.run_at + timedelta(seconds=self.repeat)
+
+ new_task = TaskManager().new_task(
+ task_name=self.task_name,
+ args=args,
+ kwargs=kwargs,
+ run_at=new_run_at,
+ priority=self.priority,
+ queue=self.queue,
+ verbose_name=self.verbose_name,
+ creator=self.creator,
+ repeat=self.repeat,
+ repeat_until=self.repeat_until,
+ )
+ new_task.save()
+ return new_task
+
+ def save(self, *arg, **kw):
+ # force NULL rather than empty string
+ self.locked_by = self.locked_by or None
+ return super(Task, self).save(*arg, **kw)
+
+ def __str__(self):
+ return u'{}'.format(self.verbose_name or self.task_name)
+
+ class Meta:
+ db_table = 'background_task'
diff --git a/background_task/models_completed.py b/background_task/models_completed.py
new file mode 100644
index 0000000..1f7b444
--- /dev/null
+++ b/background_task/models_completed.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+import os
+
+from compat.models import GenericForeignKey
+from django.contrib.contenttypes.models import ContentType
+
+from django.db import models
+from django.utils import timezone
+from django.utils.six import python_2_unicode_compatible
+
+from background_task.models import Task
+
+
+class CompletedTaskQuerySet(models.QuerySet):
+
+ def created_by(self, creator):
+ """
+ :return: A CompletedTask queryset filtered by creator
+ """
+ content_type = ContentType.objects.get_for_model(creator)
+ return self.filter(
+ creator_content_type=content_type,
+ creator_object_id=creator.id,
+ )
+
+ def failed(self, within=None):
+ """
+ :param within: A timedelta object
+ :return: A queryset of CompletedTasks that failed within the given timeframe (e.g. less than 1h ago)
+ """
+ qs = self.filter(
+ failed_at__isnull=False,
+ )
+ if within:
+ time_limit = timezone.now() - within
+ qs = qs.filter(failed_at__gt=time_limit)
+ return qs
+
+ def succeeded(self, within=None):
+ """
+ :param within: A timedelta object
+ :return: A queryset of CompletedTasks that completed successfully within the given timeframe
+ (e.g. less than 1h ago)
+ """
+ qs = self.filter(
+ failed_at__isnull=True,
+ )
+ if within:
+ time_limit = timezone.now() - within
+ qs = qs.filter(run_at__gt=time_limit)
+ return qs
+
+
+ at python_2_unicode_compatible
+class CompletedTask(models.Model):
+ # the "name" of the task/function to be run
+ task_name = models.CharField(max_length=255, db_index=True)
+ # the json encoded parameters to pass to the task
+ task_params = models.TextField()
+ # a sha1 hash of the name and params, to lookup already scheduled tasks
+ task_hash = models.CharField(max_length=40, db_index=True)
+
+ verbose_name = models.CharField(max_length=255, null=True, blank=True)
+
+ # what priority the task has
+ priority = models.IntegerField(default=0, db_index=True)
+ # when the task should be run
+ run_at = models.DateTimeField(db_index=True)
+
+ repeat = models.BigIntegerField(choices=Task.REPEAT_CHOICES, default=Task.NEVER)
+ repeat_until = models.DateTimeField(null=True, blank=True)
+
+ # the "name" of the queue this is to be run on
+ queue = models.CharField(max_length=255, db_index=True,
+ null=True, blank=True)
+
+ # how many times the task has been tried
+ attempts = models.IntegerField(default=0, db_index=True)
+ # when the task last failed
+ failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
+ # details of the error that occurred
+ last_error = models.TextField(blank=True)
+
+ # details of who's trying to run the task at the moment
+ locked_by = models.CharField(max_length=64, db_index=True,
+ null=True, blank=True)
+ locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
+
+ creator_content_type = models.ForeignKey(
+ ContentType, null=True, blank=True,
+ related_name='completed_background_task', on_delete=models.CASCADE
+ )
+ creator_object_id = models.PositiveIntegerField(null=True, blank=True)
+ creator = GenericForeignKey('creator_content_type', 'creator_object_id')
+
+ objects = CompletedTaskQuerySet.as_manager()
+
+ def locked_by_pid_running(self):
+ """
+ Check if the locked_by process is still running.
+ """
+ if self.locked_by:
+ try:
+ # won't kill the process. kill is a bad named system call
+ os.kill(int(self.locked_by), 0)
+ return True
+ except:
+ return False
+ else:
+ return None
+ locked_by_pid_running.boolean = True
+
+ def has_error(self):
+ """
+ Check if the last_error field is empty.
+ """
+ return bool(self.last_error)
+ has_error.boolean = True
+
+ def __str__(self):
+ return u'{} - {}'.format(
+ self.verbose_name or self.task_name,
+ self.run_at,
+ )
diff --git a/background_task/settings.py b/background_task/settings.py
new file mode 100644
index 0000000..6a60812
--- /dev/null
+++ b/background_task/settings.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+import multiprocessing
+
+from django.conf import settings
+
+try:
+ cpu_count = multiprocessing.cpu_count()
+except Exception:
+ cpu_count = 1
+
+
+class AppSettings(object):
+ """
+ """
+ @property
+ def MAX_ATTEMPTS(self):
+ """Control how many times a task will be attempted."""
+ return getattr(settings, 'MAX_ATTEMPTS', 25)
+
+ @property
+ def BACKGROUND_TASK_MAX_ATTEMPTS(self):
+ """Control how many times a task will be attempted."""
+ return self.MAX_ATTEMPTS
+
+ @property
+ def MAX_RUN_TIME(self):
+ """Maximum possible task run time, after which tasks will be unlocked and tried again."""
+ return getattr(settings, 'MAX_RUN_TIME', 3600)
+
+ @property
+ def BACKGROUND_TASK_MAX_RUN_TIME(self):
+ """Maximum possible task run time, after which tasks will be unlocked and tried again."""
+ return self.MAX_RUN_TIME
+
+ @property
+ def BACKGROUND_TASK_RUN_ASYNC(self):
+ """Control if tasks will run asynchronous in a ThreadPool."""
+ return getattr(settings, 'BACKGROUND_TASK_RUN_ASYNC', False)
+
+ @property
+ def BACKGROUND_TASK_ASYNC_THREADS(self):
+ """Specify number of concurrent threads."""
+ return getattr(settings, 'BACKGROUND_TASK_ASYNC_THREADS', cpu_count)
+
+ @property
+ def BACKGROUND_TASK_PRIORITY_ORDERING(self):
+ """
+ Control the ordering of tasks in the queue.
+ Choose either `DESC` or `ASC`.
+
+ https://en.m.wikipedia.org/wiki/Nice_(Unix)
+ A niceness of −20 is the highest priority and 19 is the lowest priority. The default niceness for processes is inherited from its parent process and is usually 0.
+ """
+ order = getattr(settings, 'BACKGROUND_TASK_PRIORITY_ORDERING', 'DESC')
+ if order == 'ASC':
+ prefix = ''
+ else:
+ prefix = '-'
+ return prefix
+
+app_settings = AppSettings()
diff --git a/background_task/signals.py b/background_task/signals.py
new file mode 100644
index 0000000..f927d7f
--- /dev/null
+++ b/background_task/signals.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+import django.dispatch
+from django.db import connections
+from background_task.settings import app_settings
+
+task_created = django.dispatch.Signal(providing_args=['task'])
+task_error = django.dispatch.Signal(providing_args=['task'])
+task_rescheduled = django.dispatch.Signal(providing_args=['task'])
+task_failed = django.dispatch.Signal(providing_args=['task_id', 'completed_task'])
+task_successful = django.dispatch.Signal(providing_args=['task_id', 'completed_task'])
+task_started = django.dispatch.Signal()
+task_finished = django.dispatch.Signal()
+
+
+# Register an event to reset saved queries when a Task is started.
+def reset_queries(**kwargs):
+ if app_settings.BACKGROUND_TASK_RUN_ASYNC:
+ for conn in connections.all():
+ conn.queries_log.clear()
+
+
+task_started.connect(reset_queries)
+
+
+# Register an event to reset transaction state and close connections past
+# their lifetime.
... 1451 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/django-background-tasks.git
More information about the Python-modules-commits
mailing list