[Python-modules-commits] [django-background-tasks] 01/03: New upstream version 1.1.11

Hans-Christoph Steiner eighthave at moszumanska.debian.org
Tue Sep 26 11:47:18 UTC 2017


This is an automated email from the git hooks/post-receive script.

eighthave pushed a commit to branch master
in repository django-background-tasks.

commit e683565d2a908fb5ad2c12be1302e16053c3b553
Author: Hans-Christoph Steiner <hans at eds.org>
Date:   Mon Sep 25 22:09:47 2017 +0200

    New upstream version 1.1.11
---
 AUTHORS.txt                                        |  17 +
 LICENSE                                            |  28 +
 MANIFEST.in                                        |   6 +
 PKG-INFO                                           |  47 ++
 README.md                                          |  22 +
 background_task/__init__.py                        |   8 +
 background_task/admin.py                           |  19 +
 background_task/apps.py                            |  10 +
 background_task/exceptions.py                      |   8 +
 background_task/management/__init__.py             |   0
 background_task/management/commands/__init__.py    |   0
 .../management/commands/process_tasks.py           | 101 +++
 background_task/migrations/0001_initial.py         |  65 ++
 background_task/migrations/__init__.py             |   0
 background_task/models.py                          | 316 ++++++++
 background_task/models_completed.py                | 124 ++++
 background_task/settings.py                        |  61 ++
 background_task/signals.py                         |  34 +
 background_task/tasks.py                           | 313 ++++++++
 background_task/tests/__init__.py                  |   0
 background_task/tests/test_settings.py             |  47 ++
 background_task/tests/test_settings_async.py       |   4 +
 background_task/tests/test_tasks.py                | 825 +++++++++++++++++++++
 background_task/utils.py                           |  31 +
 classifiers                                        |  15 +
 django_background_tasks.egg-info/PKG-INFO          |  47 ++
 django_background_tasks.egg-info/SOURCES.txt       |  32 +
 .../dependency_links.txt                           |   1 +
 django_background_tasks.egg-info/requires.txt      |   2 +
 django_background_tasks.egg-info/top_level.txt     |   1 +
 django_background_tasks.egg-info/zip-safe          |   1 +
 requirements.txt                                   |   2 +
 setup.cfg                                          |   5 +
 setup.py                                           |  22 +
 34 files changed, 2214 insertions(+)

diff --git a/AUTHORS.txt b/AUTHORS.txt
new file mode 100644
index 0000000..15cb1bf
--- /dev/null
+++ b/AUTHORS.txt
@@ -0,0 +1,17 @@
+Contributors
+
+* John Montgomery (lilspikey & johnsensible, initiator)
+* Yannik Ammann (yannik-ammann)
+* Luthaf (luthaf)
+* Philippe O. Wagner (philippeowagner)
+* weijia (weijia)
+* tdruez (tdruez)
+* Chad G. Hansen (chadgh)
+* Grant McConnaughey (grantmcconnaughey)
+* James Mason (bear454)
+* Pavel Zagrebelin (Zagrebelin)
+* Stephen Brown (december1981)
+* Adam Johnson (adamchainz)
+* (kherrett)
+
+Your name could stand here :)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..b4a83e1
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,28 @@
+Copyright (c) 2015, arteria GmbH.
+Copyright (c) 2010, John Montgomery.
+All rights reserved.
+ 
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+ 
+    1. Redistributions of source code must retain the above copyright notice, 
+       this list of conditions and the following disclaimer.
+    
+    2. Redistributions in binary form must reproduce the above copyright 
+       notice, this list of conditions and the following disclaimer in the
+       documentation and/or other materials provided with the distribution.
+ 
+    3. Neither the name of Django Bakground Task nor the names of its 
+       contributors may be used to endorse or promote products derived from 
+       this software without specific prior written permission.
+ 
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..8ca8c24
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,6 @@
+include README.md
+include LICENSE
+include AUTHORS.txt
+include requirements.txt
+include classifiers
+recursive-include tests *
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..831f0c2
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,47 @@
+Metadata-Version: 1.1
+Name: django-background-tasks
+Version: 1.1.11
+Summary: Database backed asynchronous task queue
+Home-page: http://github.com/arteria/django-background-tasks
+Author: arteria GmbH, John Montgomery
+Author-email: admin at arteria.ch
+License: BSD
+Description: # Django Background Tasks
+        
+        [![Build Status](https://travis-ci.org/arteria/django-background-tasks.svg?branch=master)](https://travis-ci.org/arteria/django-background-tasks)
+        [![Coverage Status](https://coveralls.io/repos/arteria/django-background-tasks/badge.svg?branch=master&service=github)](https://coveralls.io/github/arteria/django-background-tasks?branch=master)
+        [![Documentation Status](https://readthedocs.org/projects/django-background-tasks/badge/?version=latest)](http://django-background-tasks.readthedocs.io/en/latest/?badge=latest)
+        [![PyPI](https://img.shields.io/pypi/v/django-background-tasks.svg)](https://pypi.python.org/pypi/django-background-tasks)
+        
+        
+        Django Background Task is a databased-backed work queue for Django, loosely based around [Ruby's DelayedJob](https://github.com/tobi/delayed_job) library. This project was adopted and adapted from [lilspikey](https://github.com/lilspikey/) django-background-task.
+        
+        To avoid conflicts on PyPI we renamed it to django-background-tasks (plural). For an easy upgrade from django-background-task to django-background-tasks, the internal module structure were left untouched.
+        
+        In Django Background Task, all tasks are implemented as functions (or any other callable).
+        
+        There are two parts to using background tasks:
+        
+        * creating the task functions and registering them with the scheduler
+        * setup a cron task (or long running process) to execute the tasks
+        
+        
+        ## Docs
+        See http://django-background-tasks.readthedocs.io/en/latest/.
+        
+Platform: UNKNOWN
+Classifier: Development Status :: 5 - Production/Stable
+Classifier: Environment :: Web Environment
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Framework :: Django
+Classifier: Framework :: Django :: 1.4
+Classifier: Framework :: Django :: 1.5
+Classifier: Framework :: Django :: 1.6
+Classifier: Framework :: Django :: 1.7
+Classifier: Framework :: Django :: 1.8
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e9d9c47
--- /dev/null
+++ b/README.md
@@ -0,0 +1,22 @@
+# Django Background Tasks
+
+[![Build Status](https://travis-ci.org/arteria/django-background-tasks.svg?branch=master)](https://travis-ci.org/arteria/django-background-tasks)
+[![Coverage Status](https://coveralls.io/repos/arteria/django-background-tasks/badge.svg?branch=master&service=github)](https://coveralls.io/github/arteria/django-background-tasks?branch=master)
+[![Documentation Status](https://readthedocs.org/projects/django-background-tasks/badge/?version=latest)](http://django-background-tasks.readthedocs.io/en/latest/?badge=latest)
+[![PyPI](https://img.shields.io/pypi/v/django-background-tasks.svg)](https://pypi.python.org/pypi/django-background-tasks)
+
+
+Django Background Task is a databased-backed work queue for Django, loosely based around [Ruby's DelayedJob](https://github.com/tobi/delayed_job) library. This project was adopted and adapted from [lilspikey](https://github.com/lilspikey/) django-background-task.
+
+To avoid conflicts on PyPI we renamed it to django-background-tasks (plural). For an easy upgrade from django-background-task to django-background-tasks, the internal module structure were left untouched.
+
+In Django Background Task, all tasks are implemented as functions (or any other callable).
+
+There are two parts to using background tasks:
+
+* creating the task functions and registering them with the scheduler
+* setup a cron task (or long running process) to execute the tasks
+
+
+## Docs
+See http://django-background-tasks.readthedocs.io/en/latest/.
diff --git a/background_task/__init__.py b/background_task/__init__.py
new file mode 100644
index 0000000..5263478
--- /dev/null
+++ b/background_task/__init__.py
@@ -0,0 +1,8 @@
+# -*- coding: utf-8 -*-
+__version__ = '1.1.11'
+
+default_app_config = 'background_task.apps.BackgroundTasksAppConfig'
+
+def background(*arg, **kw):
+    from background_task.tasks import tasks
+    return tasks.background(*arg, **kw)
diff --git a/background_task/admin.py b/background_task/admin.py
new file mode 100644
index 0000000..eccf998
--- /dev/null
+++ b/background_task/admin.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+from django.contrib import admin
+from background_task.models_completed import CompletedTask
+
+from background_task.models import Task
+
+
+class TaskAdmin(admin.ModelAdmin):
+    display_filter = ['task_name']
+    list_display = ['task_name', 'task_params', 'run_at', 'priority', 'attempts', 'has_error', 'locked_by', 'locked_by_pid_running', ]
+
+
+class CompletedTaskAdmin(admin.ModelAdmin):
+    display_filter = ['task_name']
+    list_display = ['task_name', 'task_params', 'run_at', 'priority', 'attempts', 'has_error', 'locked_by', 'locked_by_pid_running', ]
+
+
+admin.site.register(Task, TaskAdmin)
+admin.site.register(CompletedTask, CompletedTaskAdmin)
diff --git a/background_task/apps.py b/background_task/apps.py
new file mode 100644
index 0000000..d5f7a5d
--- /dev/null
+++ b/background_task/apps.py
@@ -0,0 +1,10 @@
+from django.apps import AppConfig
+
+
+class BackgroundTasksAppConfig(AppConfig):
+    name = 'background_task'
+    from background_task import __version__ as version_info
+    verbose_name = 'Background Tasks ({})'.format(version_info)
+
+    def ready(self):
+        import background_task.signals  # noqa
diff --git a/background_task/exceptions.py b/background_task/exceptions.py
new file mode 100644
index 0000000..6ae0341
--- /dev/null
+++ b/background_task/exceptions.py
@@ -0,0 +1,8 @@
+# -*- coding: utf-8 -*-
+
+
+class BackgroundTaskError(Exception):
+
+    def __init__(self, message, errors=None):
+        super(Exception, self).__init__(message)
+        self.errors = errors
diff --git a/background_task/management/__init__.py b/background_task/management/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/management/commands/__init__.py b/background_task/management/commands/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/management/commands/process_tasks.py b/background_task/management/commands/process_tasks.py
new file mode 100644
index 0000000..c349388
--- /dev/null
+++ b/background_task/management/commands/process_tasks.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+import logging
+import random
+import sys
+import time
+
+from django import VERSION
+from django.core.management.base import BaseCommand
+
+from background_task.tasks import tasks, autodiscover
+from background_task.utils import SignalManager
+from compat import close_connection
+
+
+logger = logging.getLogger(__name__)
+
+
+def _configure_log_std():
+    class StdOutWrapper(object):
+        def write(self, s):
+            logger.info(s)
+
+    class StdErrWrapper(object):
+        def write(self, s):
+            logger.error(s)
+    sys.stdout = StdOutWrapper()
+    sys.stderr = StdErrWrapper()
+
+
+class Command(BaseCommand):
+    help = 'Run tasks that are scheduled to run on the queue'
+
+    # Command options are specified in an abstract way to enable Django < 1.8 compatibility
+    OPTIONS = (
+        (('--duration', ), {
+            'action': 'store',
+            'dest': 'duration',
+            'type': int,
+            'default': 0,
+            'help': 'Run task for this many seconds (0 or less to run forever) - default is 0',
+        }),
+        (('--sleep', ), {
+            'action': 'store',
+            'dest': 'sleep',
+            'type': float,
+            'default': 5.0,
+            'help': 'Sleep for this many seconds before checking for new tasks (if none were found) - default is 5',
+        }),
+        (('--queue', ), {
+            'action': 'store',
+            'dest': 'queue',
+            'help': 'Only process tasks on this named queue',
+        }),
+        (('--log-std', ), {
+            'action': 'store_true',
+            'dest': 'log_std',
+            'help': 'Redirect stdout and stderr to the logging system',
+        }),
+
+    )
+
+    if VERSION < (1, 8):
+        from optparse import make_option
+        option_list = BaseCommand.option_list + tuple([make_option(*args, **kwargs) for args, kwargs in OPTIONS])
+
+    # Used in Django >= 1.8
+    def add_arguments(self, parser):
+        for (args, kwargs) in self.OPTIONS:
+            parser.add_argument(*args, **kwargs)
+
+    def __init__(self, *args, **kwargs):
+        super(Command, self).__init__(*args, **kwargs)
+        self._tasks = tasks
+
+    def handle(self, *args, **options):
+        duration = options.pop('duration', 0)
+        sleep = options.pop('sleep', 5.0)
+        queue = options.pop('queue', None)
+        log_std = options.pop('log_std', False)
+        sig_manager = SignalManager()
+
+        if log_std:
+            _configure_log_std()
+
+        autodiscover()
+
+        start_time = time.time()
+
+        while (duration <= 0) or (time.time() - start_time) <= duration:
+            if sig_manager.kill_now:
+                # shutting down gracefully
+                break
+
+            if not self._tasks.run_next_task(queue):
+                # there were no tasks in the queue, let's recover.
+                close_connection()
+                logger.debug('waiting for tasks')
+                time.sleep(sleep)
+            else:
+                # there were some tasks to process, let's check if there is more work to do after a little break.
+                time.sleep(random.uniform(sig_manager.time_to_wait[0], sig_manager.time_to_wait[1]))
diff --git a/background_task/migrations/0001_initial.py b/background_task/migrations/0001_initial.py
new file mode 100644
index 0000000..b71cdc3
--- /dev/null
+++ b/background_task/migrations/0001_initial.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+# Generated by Django 1.10.6 on 2017-04-03 21:42
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+    initial = True
+
+    dependencies = [
+        ('contenttypes', '0002_remove_content_type_name'),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name='CompletedTask',
+            fields=[
+                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+                ('task_name', models.CharField(db_index=True, max_length=255)),
+                ('task_params', models.TextField()),
+                ('task_hash', models.CharField(db_index=True, max_length=40)),
+                ('verbose_name', models.CharField(blank=True, max_length=255, null=True)),
+                ('priority', models.IntegerField(db_index=True, default=0)),
+                ('run_at', models.DateTimeField(db_index=True)),
+                ('repeat', models.BigIntegerField(choices=[(3600, 'hourly'), (86400, 'daily'), (604800, 'weekly'), (1209600, 'every 2 weeks'), (2419200, 'every 4 weeks'), (0, 'never')], default=0)),
+                ('repeat_until', models.DateTimeField(blank=True, null=True)),
+                ('queue', models.CharField(blank=True, db_index=True, max_length=255, null=True)),
+                ('attempts', models.IntegerField(db_index=True, default=0)),
+                ('failed_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+                ('last_error', models.TextField(blank=True)),
+                ('locked_by', models.CharField(blank=True, db_index=True, max_length=64, null=True)),
+                ('locked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+                ('creator_object_id', models.PositiveIntegerField(blank=True, null=True)),
+                ('creator_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='completed_background_task', to='contenttypes.ContentType')),
+            ],
+        ),
+        migrations.CreateModel(
+            name='Task',
+            fields=[
+                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+                ('task_name', models.CharField(db_index=True, max_length=255)),
+                ('task_params', models.TextField()),
+                ('task_hash', models.CharField(db_index=True, max_length=40)),
+                ('verbose_name', models.CharField(blank=True, max_length=255, null=True)),
+                ('priority', models.IntegerField(db_index=True, default=0)),
+                ('run_at', models.DateTimeField(db_index=True)),
+                ('repeat', models.BigIntegerField(choices=[(3600, 'hourly'), (86400, 'daily'), (604800, 'weekly'), (1209600, 'every 2 weeks'), (2419200, 'every 4 weeks'), (0, 'never')], default=0)),
+                ('repeat_until', models.DateTimeField(blank=True, null=True)),
+                ('queue', models.CharField(blank=True, db_index=True, max_length=255, null=True)),
+                ('attempts', models.IntegerField(db_index=True, default=0)),
+                ('failed_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+                ('last_error', models.TextField(blank=True)),
+                ('locked_by', models.CharField(blank=True, db_index=True, max_length=64, null=True)),
+                ('locked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
+                ('creator_object_id', models.PositiveIntegerField(blank=True, null=True)),
+                ('creator_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='background_task', to='contenttypes.ContentType')),
+            ],
+            options={
+                'db_table': 'background_task',
+            },
+        ),
+    ]
diff --git a/background_task/migrations/__init__.py b/background_task/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/background_task/models.py b/background_task/models.py
new file mode 100644
index 0000000..726d6ef
--- /dev/null
+++ b/background_task/models.py
@@ -0,0 +1,316 @@
+# -*- coding: utf-8 -*-
+from datetime import timedelta
+from hashlib import sha1
+import json
+import logging
+import os
+import traceback
+
+from compat import StringIO
+from compat.models import GenericForeignKey
+from django.contrib.contenttypes.models import ContentType
+from django.db import models
+from django.db.models import Q
+from django.utils import timezone
+from django.utils.six import python_2_unicode_compatible
+
+from background_task.settings import app_settings
+from background_task.signals import task_failed, task_rescheduled
+
+
+logger = logging.getLogger(__name__)
+
+
+# inspired by http://github.com/tobi/delayed_job
+#
+
+
+class TaskQuerySet(models.QuerySet):
+
+    def created_by(self, creator):
+        """
+        :return: A Task queryset filtered by creator
+        """
+        content_type = ContentType.objects.get_for_model(creator)
+        return self.filter(
+            creator_content_type=content_type,
+            creator_object_id=creator.id,
+        )
+
+
+class TaskManager(models.Manager):
+
+    def get_queryset(self):
+        return TaskQuerySet(self.model, using=self._db)
+
+    def created_by(self, creator):
+        return self.get_queryset().created_by(creator)
+
+    def find_available(self, queue=None):
+        now = timezone.now()
+        qs = self.unlocked(now)
+        if queue:
+            qs = qs.filter(queue=queue)
+        ready = qs.filter(run_at__lte=now, failed_at=None)
+        _priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
+        ready = ready.order_by(_priority_ordering, 'run_at')
+
+        if app_settings.BACKGROUND_TASK_RUN_ASYNC:
+            currently_locked = self.locked(now).count()
+            count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - currently_locked
+            if count > 0:
+                ready = ready[:count]
+            else:
+                ready = self.none()
+
+        return ready
+
+    def unlocked(self, now):
+        max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
+        qs = self.get_queryset()
+        expires_at = now - timedelta(seconds=max_run_time)
+        unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at)
+        return qs.filter(unlocked)
+
+    def locked(self, now):
+        max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
+        qs = self.get_queryset()
+        expires_at = now - timedelta(seconds=max_run_time)
+        locked = Q(locked_by__isnull=False) | Q(locked_at__gt=expires_at)
+        return qs.filter(locked)
+
+    def new_task(self, task_name, args=None, kwargs=None,
+                 run_at=None, priority=0, queue=None, verbose_name=None, creator=None,
+                 repeat=None, repeat_until=None):
+        args = args or ()
+        kwargs = kwargs or {}
+        if run_at is None:
+            run_at = timezone.now()
+
+        task_params = json.dumps((args, kwargs), sort_keys=True)
+        s = "%s%s" % (task_name, task_params)
+        task_hash = sha1(s.encode('utf-8')).hexdigest()
+        return Task(task_name=task_name,
+                    task_params=task_params,
+                    task_hash=task_hash,
+                    priority=priority,
+                    run_at=run_at,
+                    queue=queue,
+                    verbose_name=verbose_name,
+                    creator=creator,
+                    repeat=repeat or Task.NEVER,
+                    repeat_until=repeat_until,
+                    )
+
+    def get_task(self, task_name, args=None, kwargs=None):
+        args = args or ()
+        kwargs = kwargs or {}
+        task_params = json.dumps((args, kwargs), sort_keys=True)
+        s = "%s%s" % (task_name, task_params)
+        task_hash = sha1(s.encode('utf-8')).hexdigest()
+        qs = self.get_queryset()
+        return qs.filter(task_hash=task_hash)
+
+    def drop_task(self, task_name, args=None, kwargs=None):
+        return self.get_task(task_name, args, kwargs).delete()
+
+
+ at python_2_unicode_compatible
+class Task(models.Model):
+    # the "name" of the task/function to be run
+    task_name = models.CharField(max_length=255, db_index=True)
+    # the json encoded parameters to pass to the task
+    task_params = models.TextField()
+    # a sha1 hash of the name and params, to lookup already scheduled tasks
+    task_hash = models.CharField(max_length=40, db_index=True)
+
+    verbose_name = models.CharField(max_length=255, null=True, blank=True)
+
+    # what priority the task has
+    priority = models.IntegerField(default=0, db_index=True)
+    # when the task should be run
+    run_at = models.DateTimeField(db_index=True)
+
+    # Repeat choices are encoded as number of seconds
+    # The repeat implementation is based on this encoding
+    HOURLY = 3600
+    DAILY = 24 * HOURLY
+    WEEKLY = 7 * DAILY
+    EVERY_2_WEEKS = 2 * WEEKLY
+    EVERY_4_WEEKS = 4 * WEEKLY
+    NEVER = 0
+    REPEAT_CHOICES = (
+        (HOURLY, 'hourly'),
+        (DAILY, 'daily'),
+        (WEEKLY, 'weekly'),
+        (EVERY_2_WEEKS, 'every 2 weeks'),
+        (EVERY_4_WEEKS, 'every 4 weeks'),
+        (NEVER, 'never'),
+    )
+    repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
+    repeat_until = models.DateTimeField(null=True, blank=True)
+
+    # the "name" of the queue this is to be run on
+    queue = models.CharField(max_length=255, db_index=True,
+                             null=True, blank=True)
+
+    # how many times the task has been tried
+    attempts = models.IntegerField(default=0, db_index=True)
+    # when the task last failed
+    failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
+    # details of the error that occurred
+    last_error = models.TextField(blank=True)
+
+    # details of who's trying to run the task at the moment
+    locked_by = models.CharField(max_length=64, db_index=True,
+                                 null=True, blank=True)
+    locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
+
+    creator_content_type = models.ForeignKey(
+        ContentType, null=True, blank=True,
+        related_name='background_task', on_delete=models.CASCADE
+    )
+    creator_object_id = models.PositiveIntegerField(null=True, blank=True)
+    creator = GenericForeignKey('creator_content_type', 'creator_object_id')
+
+    objects = TaskManager()
+
+    def locked_by_pid_running(self):
+        """
+        Check if the locked_by process is still running.
+        """
+        if self.locked_by:
+            try:
+                # won't kill the process. kill is a bad named system call
+                os.kill(int(self.locked_by), 0)
+                return True
+            except:
+                return False
+        else:
+            return None
+    locked_by_pid_running.boolean = True
+
+    def has_error(self):
+        """
+        Check if the last_error field is empty.
+        """
+        return bool(self.last_error)
+    has_error.boolean = True
+
+    def params(self):
+        args, kwargs = json.loads(self.task_params)
+        # need to coerce kwargs keys to str
+        kwargs = dict((str(k), v) for k, v in kwargs.items())
+        return args, kwargs
+
+    def lock(self, locked_by):
+        now = timezone.now()
+        unlocked = Task.objects.unlocked(now).filter(pk=self.pk)
+        updated = unlocked.update(locked_by=locked_by, locked_at=now)
+        if updated:
+            return Task.objects.get(pk=self.pk)
+        return None
+
+    def _extract_error(self, type, err, tb):
+        file = StringIO()
+        traceback.print_exception(type, err, tb, None, file)
+        return file.getvalue()
+
+    def increment_attempts(self):
+        self.attempts += 1
+        self.save()
+
+    def has_reached_max_attempts(self):
+        max_attempts = app_settings.BACKGROUND_TASK_MAX_ATTEMPTS
+        return self.attempts >= max_attempts
+
+    def is_repeating_task(self):
+        return self.repeat > self.NEVER
+
+    def reschedule(self, type, err, traceback):
+        '''
+        Set a new time to run the task in future, or create a CompletedTask and delete the Task
+        if it has reached the maximum of allowed attempts
+        '''
+        self.last_error = self._extract_error(type, err, traceback)
+        self.increment_attempts()
+        if self.has_reached_max_attempts():
+            self.failed_at = timezone.now()
+            logger.warning('Marking task %s as failed', self)
+            completed = self.create_completed_task()
+            task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed)
+            self.delete()
+        else:
+            backoff = timedelta(seconds=(self.attempts ** 4) + 5)
+            self.run_at = timezone.now() + backoff
+            logger.warning('Rescheduling task %s for %s later at %s', self,
+                backoff, self.run_at)
+            task_rescheduled.send(sender=self.__class__, task=self)
+            self.locked_by = None
+            self.locked_at = None
+            self.save()
+
+    def create_completed_task(self):
+        '''
+        Returns a new CompletedTask instance with the same values
+        '''
+        from background_task.models_completed import CompletedTask
+        completed_task = CompletedTask(
+            task_name=self.task_name,
+            task_params=self.task_params,
+            task_hash=self.task_hash,
+            priority=self.priority,
+            run_at=timezone.now(),
+            queue=self.queue,
+            attempts=self.attempts,
+            failed_at=self.failed_at,
+            last_error=self.last_error,
+            locked_by=self.locked_by,
+            locked_at=self.locked_at,
+            verbose_name=self.verbose_name,
+            creator=self.creator,
+            repeat=self.repeat,
+            repeat_until=self.repeat_until,
+        )
+        completed_task.save()
+        return completed_task
+
+    def create_repetition(self):
+        """
+        :return: A new Task with an offset of self.repeat, or None if the self.repeat_until is reached
+        """
+        if not self.is_repeating_task():
+            return None
+
+        if self.repeat_until and self.repeat_until <= timezone.now():
+            # Repeat chain completed
+            return None
+
+        args, kwargs = self.params()
+        new_run_at = self.run_at + timedelta(seconds=self.repeat)
+
+        new_task = TaskManager().new_task(
+            task_name=self.task_name,
+            args=args,
+            kwargs=kwargs,
+            run_at=new_run_at,
+            priority=self.priority,
+            queue=self.queue,
+            verbose_name=self.verbose_name,
+            creator=self.creator,
+            repeat=self.repeat,
+            repeat_until=self.repeat_until,
+        )
+        new_task.save()
+        return new_task
+
+    def save(self, *arg, **kw):
+        # force NULL rather than empty string
+        self.locked_by = self.locked_by or None
+        return super(Task, self).save(*arg, **kw)
+
+    def __str__(self):
+        return u'{}'.format(self.verbose_name or self.task_name)
+
+    class Meta:
+        db_table = 'background_task'
diff --git a/background_task/models_completed.py b/background_task/models_completed.py
new file mode 100644
index 0000000..1f7b444
--- /dev/null
+++ b/background_task/models_completed.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+import os
+
+from compat.models import GenericForeignKey
+from django.contrib.contenttypes.models import ContentType
+
+from django.db import models
+from django.utils import timezone
+from django.utils.six import python_2_unicode_compatible
+
+from background_task.models import Task
+
+
+class CompletedTaskQuerySet(models.QuerySet):
+
+    def created_by(self, creator):
+        """
+        :return: A CompletedTask queryset filtered by creator
+        """
+        content_type = ContentType.objects.get_for_model(creator)
+        return self.filter(
+            creator_content_type=content_type,
+            creator_object_id=creator.id,
+        )
+
+    def failed(self, within=None):
+        """
+        :param within: A timedelta object
+        :return: A queryset of CompletedTasks that failed within the given timeframe (e.g. less than 1h ago)
+        """
+        qs = self.filter(
+            failed_at__isnull=False,
+        )
+        if within:
+            time_limit = timezone.now() - within
+            qs = qs.filter(failed_at__gt=time_limit)
+        return qs
+
+    def succeeded(self, within=None):
+        """
+        :param within: A timedelta object
+        :return: A queryset of CompletedTasks that completed successfully within the given timeframe
+        (e.g. less than 1h ago)
+        """
+        qs = self.filter(
+            failed_at__isnull=True,
+        )
+        if within:
+            time_limit = timezone.now() - within
+            qs = qs.filter(run_at__gt=time_limit)
+        return qs
+
+
+ at python_2_unicode_compatible
+class CompletedTask(models.Model):
+    # the "name" of the task/function to be run
+    task_name = models.CharField(max_length=255, db_index=True)
+    # the json encoded parameters to pass to the task
+    task_params = models.TextField()
+    # a sha1 hash of the name and params, to lookup already scheduled tasks
+    task_hash = models.CharField(max_length=40, db_index=True)
+
+    verbose_name = models.CharField(max_length=255, null=True, blank=True)
+
+    # what priority the task has
+    priority = models.IntegerField(default=0, db_index=True)
+    # when the task should be run
+    run_at = models.DateTimeField(db_index=True)
+
+    repeat = models.BigIntegerField(choices=Task.REPEAT_CHOICES, default=Task.NEVER)
+    repeat_until = models.DateTimeField(null=True, blank=True)
+
+    # the "name" of the queue this is to be run on
+    queue = models.CharField(max_length=255, db_index=True,
+                             null=True, blank=True)
+
+    # how many times the task has been tried
+    attempts = models.IntegerField(default=0, db_index=True)
+    # when the task last failed
+    failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
+    # details of the error that occurred
+    last_error = models.TextField(blank=True)
+
+    # details of who's trying to run the task at the moment
+    locked_by = models.CharField(max_length=64, db_index=True,
+                                 null=True, blank=True)
+    locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
+
+    creator_content_type = models.ForeignKey(
+        ContentType, null=True, blank=True,
+        related_name='completed_background_task', on_delete=models.CASCADE
+    )
+    creator_object_id = models.PositiveIntegerField(null=True, blank=True)
+    creator = GenericForeignKey('creator_content_type', 'creator_object_id')
+
+    objects = CompletedTaskQuerySet.as_manager()
+
+    def locked_by_pid_running(self):
+        """
+        Check if the locked_by process is still running.
+        """
+        if self.locked_by:
+            try:
+                # won't kill the process. kill is a bad named system call
+                os.kill(int(self.locked_by), 0)
+                return True
+            except:
+                return False
+        else:
+            return None
+    locked_by_pid_running.boolean = True
+
+    def has_error(self):
+        """
+        Check if the last_error field is empty.
+        """
+        return bool(self.last_error)
+    has_error.boolean = True
+
+    def __str__(self):
+        return u'{} - {}'.format(
+            self.verbose_name or self.task_name,
+            self.run_at,
+        )
diff --git a/background_task/settings.py b/background_task/settings.py
new file mode 100644
index 0000000..6a60812
--- /dev/null
+++ b/background_task/settings.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+import multiprocessing
+
+from django.conf import settings
+
+try:
+    cpu_count = multiprocessing.cpu_count()
+except Exception:
+    cpu_count = 1
+
+
+class AppSettings(object):
+    """
+    """
+    @property
+    def MAX_ATTEMPTS(self):
+        """Control how many times a task will be attempted."""
+        return getattr(settings, 'MAX_ATTEMPTS', 25)
+
+    @property
+    def BACKGROUND_TASK_MAX_ATTEMPTS(self):
+        """Control how many times a task will be attempted."""
+        return self.MAX_ATTEMPTS
+
+    @property
+    def MAX_RUN_TIME(self):
+        """Maximum possible task run time, after which tasks will be unlocked and tried again."""
+        return getattr(settings, 'MAX_RUN_TIME', 3600)
+
+    @property
+    def BACKGROUND_TASK_MAX_RUN_TIME(self):
+        """Maximum possible task run time, after which tasks will be unlocked and tried again."""
+        return self.MAX_RUN_TIME
+
+    @property
+    def BACKGROUND_TASK_RUN_ASYNC(self):
+        """Control if tasks will run asynchronous in a ThreadPool."""
+        return getattr(settings, 'BACKGROUND_TASK_RUN_ASYNC', False)
+
+    @property
+    def BACKGROUND_TASK_ASYNC_THREADS(self):
+        """Specify number of concurrent threads."""
+        return getattr(settings, 'BACKGROUND_TASK_ASYNC_THREADS', cpu_count)
+
+    @property
+    def BACKGROUND_TASK_PRIORITY_ORDERING(self):
+        """
+        Control the ordering of tasks in the queue.
+        Choose either `DESC` or `ASC`.
+
+        https://en.m.wikipedia.org/wiki/Nice_(Unix)
+        A niceness of −20 is the highest priority and 19 is the lowest priority. The default niceness for processes is inherited from its parent process and is usually 0.
+        """
+        order = getattr(settings, 'BACKGROUND_TASK_PRIORITY_ORDERING', 'DESC')
+        if order == 'ASC':
+            prefix = ''
+        else:
+            prefix = '-'
+        return prefix
+
+app_settings = AppSettings()
diff --git a/background_task/signals.py b/background_task/signals.py
new file mode 100644
index 0000000..f927d7f
--- /dev/null
+++ b/background_task/signals.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+import django.dispatch
+from django.db import connections
+from background_task.settings import app_settings
+
+task_created = django.dispatch.Signal(providing_args=['task'])
+task_error = django.dispatch.Signal(providing_args=['task'])
+task_rescheduled = django.dispatch.Signal(providing_args=['task'])
+task_failed = django.dispatch.Signal(providing_args=['task_id', 'completed_task'])
+task_successful = django.dispatch.Signal(providing_args=['task_id', 'completed_task'])
+task_started = django.dispatch.Signal()
+task_finished = django.dispatch.Signal()
+
+
+# Register an event to reset saved queries when a Task is started.
+def reset_queries(**kwargs):
+    if app_settings.BACKGROUND_TASK_RUN_ASYNC:
+        for conn in connections.all():
+            conn.queries_log.clear()
+
+
+task_started.connect(reset_queries)
+
+
+# Register an event to reset transaction state and close connections past
+# their lifetime.
... 1451 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/django-background-tasks.git



More information about the Python-modules-commits mailing list