[Python-modules-commits] [django-q] 01/03: Import django-q_0.8.orig.tar.gz
Jonas Meurer
mejo at moszumanska.debian.org
Sat Sep 16 09:18:39 UTC 2017
This is an automated email from the git hooks/post-receive script.
mejo pushed a commit to branch master
in repository django-q.
commit cc6b322822ee9d4d0eaae24ec57c3ded39aa26ae
Author: Jonas Meurer <jonas at freesources.org>
Date: Sat Sep 16 02:57:39 2017 +0200
Import django-q_0.8.orig.tar.gz
---
.gitignore | 68 +
.travis.yml | 49 +
LICENSE | 22 +
MANIFEST.in | 6 +
README.rst | 215 ++
django_q/__init__.py | 22 +
django_q/admin.py | 125 +
django_q/apps.py | 11 +
django_q/brokers/__init__.py | 190 ++
django_q/brokers/aws_sqs.py | 60 +
django_q/brokers/disque.py | 65 +
django_q/brokers/ironmq.py | 50 +
django_q/brokers/mongo.py | 73 +
django_q/brokers/orm.py | 73 +
django_q/brokers/redis_broker.py | 62 +
django_q/cluster.py | 596 ++++
django_q/conf.py | 200 ++
django_q/humanhash.py | 143 +
django_q/management/__init__.py | 0
django_q/management/commands/__init__.py | 0
django_q/management/commands/qcluster.py | 24 +
django_q/management/commands/qinfo.py | 41 +
django_q/management/commands/qmonitor.py | 21 +
django_q/migrations/0001_initial.py | 68 +
django_q/migrations/0002_auto_20150630_1624.py | 24 +
django_q/migrations/0003_auto_20150708_1326.py | 31 +
django_q/migrations/0004_auto_20150710_1043.py | 26 +
django_q/migrations/0005_auto_20150718_1506.py | 24 +
django_q/migrations/0006_auto_20150805_1817.py | 24 +
django_q/migrations/0007_ormq.py | 27 +
django_q/migrations/0008_auto_20160224_1026.py | 19 +
django_q/migrations/__init__.py | 0
django_q/models.py | 209 ++
django_q/monitor.py | 200 ++
django_q/signals.py | 30 +
django_q/signing.py | 44 +
django_q/status.py | 113 +
django_q/tasks.py | 682 +++++
django_q/tests/__init__.py | 0
django_q/tests/settings.py | 122 +
django_q/tests/tasks.py | 46 +
django_q/tests/test_admin.py | 80 +
django_q/tests/test_brokers.py | 346 +++
django_q/tests/test_cached.py | 188 ++
django_q/tests/test_cluster.py | 391 +++
django_q/tests/test_commands.py | 18 +
django_q/tests/test_monitor.py | 48 +
django_q/tests/test_scheduler.py | 134 +
django_q/tests/urls.py | 6 +
docs/Makefile | 192 ++
docs/_static/cluster.png | Bin 0 -> 63139 bytes
docs/_static/favicon.ico | Bin 0 -> 15086 bytes
docs/_static/info.png | Bin 0 -> 11309 bytes
docs/_static/logo.png | Bin 0 -> 6134 bytes
docs/_static/logo_large.png | Bin 0 -> 28750 bytes
docs/_static/monitor.png | Bin 0 -> 9354 bytes
docs/_static/scheduled.png | Bin 0 -> 50719 bytes
docs/_static/successful.png | Bin 0 -> 77776 bytes
docs/admin.rst | 89 +
docs/architecture.rst | 91 +
docs/brokers.rst | 193 ++
docs/chain.rst | 105 +
docs/cluster.rst | 149 +
docs/conf.py | 319 +++
docs/configure.rst | 419 +++
docs/examples.rst | 317 +++
docs/group.rst | 125 +
docs/index.rst | 51 +
docs/install.rst | 136 +
docs/iterable.rst | 109 +
docs/monitor.rst | 184 ++
docs/schedules.rst | 210 ++
docs/signals.rst | 43 +
docs/tasks.rst | 489 ++++
pytest.ini | 2 +
requirements.in | 12 +
requirements.txt | 27 +
runtests.py | 3447 ++++++++++++++++++++++++
setup.cfg | 2 +
setup.py | 60 +
80 files changed, 11787 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..85c9a05
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,68 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+dev-requirements.txt
+dev-requirements.in
+manage.py
+db.sqlite3
+*.ipynb
+*.rdb
+.venv
+.idea
+djq
+node_modules
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..128ef22
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,49 @@
+language: python
+
+services:
+ - redis-server
+ - mongodb
+
+python:
+ - "2.7"
+ - "3.6"
+
+env:
+ - DJANGO=1.11
+ - DJANGO=1.10.7
+ - DJANGO=1.8.18
+
+sudo: false
+
+addons:
+ apt:
+ packages:
+ - tcl8.5
+
+before_script:
+ - git clone https://github.com/antirez/disque.git disque_server
+ - "cd disque_server/src && make && PREFIX=../ make install && cd -"
+ - "./disque_server/bin/disque-server &"
+ - ./disque_server/bin/disque PING
+
+install:
+ - pip install -q django==$DJANGO
+ - pip install -r requirements.txt
+ - pip install pytest --upgrade
+ - pip install pytest-django codecov sphinx
+ - python setup.py install
+
+script:
+ - coverage run --source=django_q -m py.test
+ - sphinx-build -b html -d docs/_build/doctrees -nW docs docs/_build/html
+
+after_success:
+ - codecov
+
+notifications:
+ webhooks:
+ urls:
+ - https://webhooks.gitter.im/e/cbcff78c4be241602332
+ on_success: change
+ on_failure: always
+ on_start: never
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e12eb80
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Ilan Steemers
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..b3169f4
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,6 @@
+include LICENSE
+include README.rst
+include django_q/management/*.py
+include django_q/management/commands/*.py
+include django_q/migrations/*.py
+include django_q/brokers/*.py
\ No newline at end of file
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..a7b9a72
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,215 @@
+.. image:: docs/_static/logo.png
+ :align: center
+ :alt: Q logo
+ :target: https://django-q.readthedocs.org/
+
+A multiprocessing distributed task queue for Django
+---------------------------------------------------
+
+|image0| |image1| |docs| |image2|
+
+Features
+~~~~~~~~
+
+- Multiprocessing worker pool
+- Asynchronous tasks
+- Scheduled and repeated tasks
+- Encrypted and compressed packages
+- Failure and success database or cache
+- Result hooks, groups and chains
+- Django Admin integration
+- PaaS compatible with multiple instances
+- Multi cluster monitor
+- Redis, Disque, IronMQ, SQS, MongoDB or ORM
+- Rollbar support
+
+Requirements
+~~~~~~~~~~~~
+
+- `Django <https://www.djangoproject.com>`__ > = 1.8
+- `Django-picklefield <https://github.com/gintas/django-picklefield>`__
+- `Arrow <https://github.com/crsmithdev/arrow>`__
+- `Blessed <https://github.com/jquast/blessed>`__
+
+Tested with: Python 2.7 & 3.6. Django 1.8.18, 1.10.7 and 1.11
+
+Brokers
+~~~~~~~
+- `Redis <https://django-q.readthedocs.org/en/latest/brokers.html#redis>`__
+- `Disque <https://django-q.readthedocs.org/en/latest/brokers.html#disque>`__
+- `IronMQ <https://django-q.readthedocs.org/en/latest/brokers.html#ironmq>`__
+- `Amazon SQS <https://django-q.readthedocs.org/en/latest/brokers.html#amazon-sqs>`__
+- `MongoDB <https://django-q.readthedocs.org/en/latest/brokers.html#mongodb>`__
+- `Django ORM <https://django-q.readthedocs.org/en/latest/brokers.html#django-orm>`__
+
+Installation
+~~~~~~~~~~~~
+
+- Install the latest version with pip::
+
+ $ pip install django-q
+
+
+- Add `django_q` to your `INSTALLED_APPS` in your projects `settings.py`::
+
+ INSTALLED_APPS = (
+ # other apps
+ 'django_q',
+ )
+
+- Run Django migrations to create the database tables::
+
+ $ python manage.py migrate
+
+- Choose a message `broker <https://django-q.readthedocs.org/en/latest/brokers.html>`__ , configure and install the appropriate client library.
+
+Read the full documentation at `https://django-q.readthedocs.org <https://django-q.readthedocs.org>`__
+
+
+Configuration
+~~~~~~~~~~~~~
+
+All configuration settings are optional. e.g:
+
+.. code:: python
+
+ # settings.py example
+ Q_CLUSTER = {
+ 'name': 'myproject',
+ 'workers': 8,
+ 'recycle': 500,
+ 'timeout': 60,
+ 'compress': True,
+ 'cpu_affinity': 1,
+ 'save_limit': 250,
+ 'queue_limit': 500,
+ 'label': 'Django Q',
+ 'redis': {
+ 'host': '127.0.0.1',
+ 'port': 6379,
+ 'db': 0, }
+ }
+
+For full configuration options, see the `configuration documentation <https://django-q.readthedocs.org/en/latest/configure.html>`__.
+
+Management Commands
+~~~~~~~~~~~~~~~~~~~
+
+Start a cluster with::
+
+ $ python manage.py qcluster
+
+Monitor your clusters with::
+
+ $ python manage.py qmonitor
+
+Check overall statistics with::
+
+ $ python manage.py qinfo
+
+Creating Tasks
+~~~~~~~~~~~~~~
+
+Use `async` from your code to quickly offload tasks:
+
+.. code:: python
+
+ from django_q.tasks import async, result
+
+ # create the task
+ async('math.copysign', 2, -2)
+
+ # or with a reference
+ import math.copysign
+
+ task_id = async(copysign, 2, -2)
+
+ # get the result
+ task_result = result(task_id)
+
+ # result returns None if the task has not been executed yet
+ # you can wait for it
+ task_result = result(task_id, 200)
+
+ # but in most cases you will want to use a hook:
+
+ async('math.modf', 2.5, hook='hooks.print_result')
+
+ # hooks.py
+ def print_result(task):
+ print(task.result)
+
+For more info see `Tasks <https://django-q.readthedocs.org/en/latest/tasks.html>`__
+
+
+Schedule
+~~~~~~~~
+
+Schedules are regular Django models. You can manage them through the
+Admin page or directly from your code:
+
+.. code:: python
+
+ # Use the schedule function
+ from django_q.tasks import schedule
+
+ schedule('math.copysign',
+ 2, -2,
+ hook='hooks.print_result',
+ schedule_type=Schedule.DAILY)
+
+ # Or create the object directly
+ from django_q.models import Schedule
+
+ Schedule.objects.create(func='math.copysign',
+ hook='hooks.print_result',
+ args='2,-2',
+ schedule_type=Schedule.DAILY
+ )
+
+ # Run a task every 5 minutes, starting at 6 today
+ # for 2 hours
+ import arrow
+
+ schedule('math.hypot',
+ 3, 4,
+ schedule_type=Schedule.MINUTES,
+ minutes=5,
+ repeats=24,
+ next_run=arrow.utcnow().replace(hour=18, minute=0))
+
+For more info check the `Schedules <https://django-q.readthedocs.org/en/latest/schedules.html>`__ documentation.
+
+
+Testing
+~~~~~~~
+
+To run the tests you will need `py.test <http://pytest.org/latest/>`__ and `pytest-django <https://github.com/pytest-dev/pytest-django>`__
+
+
+Todo
+~~~~
+
+- Better tests and coverage
+- Less dependencies?
+
+Acknowledgements
+~~~~~~~~~~~~~~~~
+
+- Django Q was inspired by working with
+ `Django-RQ <https://github.com/ui/django-rq>`__ and
+ `RQ <https://github.com/ui/django-rq>`__
+- Human readable hashes by
+ `HumanHash <https://github.com/zacharyvoase/humanhash>`__
+- Redditors feedback at `r/django <https://www.reddit.com/r/django/>`__
+
+.. |image0| image:: https://travis-ci.org/Koed00/django-q.svg?branch=master
+ :target: https://travis-ci.org/Koed00/django-q
+.. |image1| image:: http://codecov.io/github/Koed00/django-q/coverage.svg?branch=master
+ :target: http://codecov.io/github/Koed00/django-q?branch=master
+.. |image2| image:: http://badges.gitter.im/Join%20Chat.svg
+ :target: https://gitter.im/Koed00/django-q
+.. |docs| image:: https://readthedocs.org/projects/docs/badge/?version=latest
+ :alt: Documentation Status
+ :scale: 100
+ :target: https://django-q.readthedocs.org/
diff --git a/django_q/__init__.py b/django_q/__init__.py
new file mode 100644
index 0000000..4f3b689
--- /dev/null
+++ b/django_q/__init__.py
@@ -0,0 +1,22 @@
+import os
+import sys
+from django import get_version
+
+myPath = os.path.dirname(os.path.abspath(__file__))
+sys.path.insert(0, myPath)
+
+VERSION = (0, 8, 0)
+
+default_app_config = 'django_q.apps.DjangoQConfig'
+
+# root imports will slowly be deprecated.
+# please import from the relevant sub modules
+split_version = get_version().split('.')
+if split_version[1] not in ('9', '10', '11'):
+ from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size
+ from .models import Task, Schedule, Success, Failure
+ from .cluster import Cluster
+ from .status import Stat
+ from .brokers import get_broker
+
+__all__ = ['conf', 'cluster', 'models', 'tasks']
diff --git a/django_q/admin.py b/django_q/admin.py
new file mode 100644
index 0000000..36f70a1
--- /dev/null
+++ b/django_q/admin.py
@@ -0,0 +1,125 @@
+"""Admin module for Django."""
+from django.contrib import admin
+from django.utils.translation import ugettext_lazy as _
+
+from .tasks import async
+from .models import Success, Failure, Schedule, OrmQ
+from .conf import Conf
+
+
+class TaskAdmin(admin.ModelAdmin):
+ """model admin for success tasks."""
+
+ list_display = (
+ u'name',
+ 'func',
+ 'started',
+ 'stopped',
+ 'time_taken',
+ 'group'
+ )
+
+ def has_add_permission(self, request, obj=None):
+ """Don't allow adds."""
+ return False
+
+ def get_queryset(self, request):
+ """Only show successes."""
+ qs = super(TaskAdmin, self).get_queryset(request)
+ return qs.filter(success=True)
+
+ search_fields = ('name', 'func', 'group')
+ readonly_fields = []
+ list_filter = ('group',)
+
+ def get_readonly_fields(self, request, obj=None):
+ """Set all fields readonly."""
+ return list(self.readonly_fields) + \
+ [field.name for field in obj._meta.fields]
+
+
+def retry_failed(FailAdmin, request, queryset):
+ """Submit selected tasks back to the queue."""
+ for task in queryset:
+ async(task.func, *task.args or (), hook=task.hook, **task.kwargs or {})
+ task.delete()
+
+
+retry_failed.short_description = _("Resubmit selected tasks to queue")
+
+
+class FailAdmin(admin.ModelAdmin):
+ """model admin for failed tasks."""
+
+ list_display = (
+ 'name',
+ 'func',
+ 'started',
+ 'stopped',
+ 'result'
+ )
+
+ def has_add_permission(self, request, obj=None):
+ """Don't allow adds."""
+ return False
+
+ actions = [retry_failed]
+ search_fields = ('name', 'func')
+ list_filter = ('group',)
+ readonly_fields = []
+
+ def get_readonly_fields(self, request, obj=None):
+ """Set all fields readonly."""
+ return list(self.readonly_fields) + \
+ [field.name for field in obj._meta.fields]
+
+
+class ScheduleAdmin(admin.ModelAdmin):
+ """ model admin for schedules """
+
+ list_display = (
+ 'id',
+ 'name',
+ 'func',
+ 'schedule_type',
+ 'repeats',
+ 'next_run',
+ 'last_run',
+ 'success'
+ )
+
+ list_filter = ('next_run', 'schedule_type')
+ search_fields = ('func',)
+ list_display_links = ('id', 'name')
+
+
+class QueueAdmin(admin.ModelAdmin):
+ """ queue admin for ORM broker """
+ list_display = (
+ 'id',
+ 'key',
+ 'task_id',
+ 'name',
+ 'func',
+ 'lock'
+ )
+
+ def save_model(self, request, obj, form, change):
+ obj.save(using=Conf.ORM)
+
+ def delete_model(self, request, obj):
+ obj.delete(using=Conf.ORM)
+
+ def get_queryset(self, request):
+ return super(QueueAdmin, self).get_queryset(request).using(Conf.ORM)
+
+ def has_add_permission(self, request, obj=None):
+ """Don't allow adds."""
+ return False
+
+admin.site.register(Schedule, ScheduleAdmin)
+admin.site.register(Success, TaskAdmin)
+admin.site.register(Failure, FailAdmin)
+
+if Conf.ORM or Conf.TESTING:
+ admin.site.register(OrmQ, QueueAdmin)
diff --git a/django_q/apps.py b/django_q/apps.py
new file mode 100644
index 0000000..d53a1e9
--- /dev/null
+++ b/django_q/apps.py
@@ -0,0 +1,11 @@
+from django.apps import AppConfig
+
+from .conf import Conf
+
+
+class DjangoQConfig(AppConfig):
+ name = 'django_q'
+ verbose_name = Conf.LABEL
+
+ def ready(self):
+ from django_q.signals import call_hook
\ No newline at end of file
diff --git a/django_q/brokers/__init__.py b/django_q/brokers/__init__.py
new file mode 100644
index 0000000..395a81c
--- /dev/null
+++ b/django_q/brokers/__init__.py
@@ -0,0 +1,190 @@
+import importlib
+
+from django.core.cache import caches, InvalidCacheBackendError
+
+from django_q.conf import Conf
+
+
+class Broker(object):
+ def __init__(self, list_key=Conf.PREFIX):
+ self.connection = self.get_connection(list_key)
+ self.list_key = list_key
+ self.cache = self.get_cache()
+ self._info = None
+
+ def enqueue(self, task):
+ """
+ Puts a task onto the queue
+ :type task: str
+ :return: task id
+ """
+ pass
+
+ def dequeue(self):
+ """
+ Gets a task from the queue
+ :return: tuple with task id and task message
+ """
+ pass
+
+ def queue_size(self):
+ """
+ :return: the amount of tasks in the queue
+ """
+ pass
+
+ def lock_size(self):
+ """
+ :return: the number of tasks currently awaiting acknowledgement
+ """
+
+ def delete_queue(self):
+ """
+ Deletes the queue from the broker
+ """
+ pass
+
+ def purge_queue(self):
+ """
+ Purges the queue of any tasks
+ """
+ pass
+
+ def delete(self, task_id):
+ """
+ Deletes a task from the queue
+ :param task_id: the id of the task
+ """
+ pass
+
+ def acknowledge(self, task_id):
+ """
+ Acknowledges completion of the task and removes it from the queue.
+ :param task_id: the id of the task
+ """
+ pass
+
+ def fail(self, task_id):
+ """
+ Fails a task message
+ :param task_id:
+ :return:
+ """
+
+ def ping(self):
+ """
+ Checks whether the broker connection is available
+ :rtype: bool
+ """
+ pass
+
+ def info(self):
+ """
+ Shows the broker type
+ """
+ return self._info
+
+ def set_stat(self, key, value, timeout):
+ """
+ Saves a cluster statistic to the cache provider
+ :type key: str
+ :type value: str
+ :type timeout: int
+ """
+ if not self.cache:
+ return
+ key_list = self.cache.get(Conf.Q_STAT, [])
+ if key not in key_list:
+ key_list.append(key)
+ self.cache.set(Conf.Q_STAT, key_list)
+ return self.cache.set(key, value, timeout)
+
+ def get_stat(self, key):
+ """
+ Gets a cluster statistic from the cache provider
+ :type key: str
+ :return: a cluster Stat
+ """
+ if not self.cache:
+ return
+ return self.cache.get(key)
+
+ def get_stats(self, pattern):
+ """
+ Returns a list of all cluster stats from the cache provider
+ :type pattern: str
+ :return: a list of Stats
+ """
+ if not self.cache:
+ return
+ key_list = self.cache.get(Conf.Q_STAT)
+ if not key_list or len(key_list) == 0:
+ return []
+ stats = []
+ for key in key_list:
+ stat = self.cache.get(key)
+ if stat:
+ stats.append(stat)
+ else:
+ key_list.remove(key)
+ self.cache.set(Conf.Q_STAT, key_list)
+ return stats
+
+ @staticmethod
+ def get_cache():
+ """
+ Gets the current cache provider
+ :return: a cache provider
+ """
+ try:
+ return caches[Conf.CACHE]
+ except InvalidCacheBackendError:
+ return None
+
+ @staticmethod
+ def get_connection(list_key=Conf.PREFIX):
+ """
+ Gets a connection to the broker
+ :param list_key: Optional queue name
+ :return: a broker connection
+ """
+ return 0
+
+
+def get_broker(list_key=Conf.PREFIX):
+ """
+ Gets the configured broker type
+ :param list_key: optional queue name
+ :type list_key: str
+ :return:
+ """
+ # custom
+ if Conf.BROKER_CLASS:
+ module, func = Conf.BROKER_CLASS.rsplit('.', 1)
+ m = importlib.import_module(module)
+ broker = getattr(m, func)
+ return broker(list_key=list_key)
+ # disque
+ elif Conf.DISQUE_NODES:
+ from brokers import disque
+ return disque.Disque(list_key=list_key)
+ # Iron MQ
+ elif Conf.IRON_MQ:
+ from brokers import ironmq
+ return ironmq.IronMQBroker(list_key=list_key)
+ # SQS
+ elif Conf.SQS:
+ from brokers import aws_sqs
+ return aws_sqs.Sqs(list_key=list_key)
+ # ORM
+ elif Conf.ORM:
+ from brokers import orm
+ return orm.ORM(list_key=list_key)
+ # Mongo
+ elif Conf.MONGO:
+ from brokers import mongo
+ return mongo.Mongo(list_key=list_key)
+ # default to redis
+ else:
+ from brokers import redis_broker
+ return redis_broker.Redis(list_key=list_key)
diff --git a/django_q/brokers/aws_sqs.py b/django_q/brokers/aws_sqs.py
new file mode 100644
index 0000000..111e631
--- /dev/null
+++ b/django_q/brokers/aws_sqs.py
@@ -0,0 +1,60 @@
+from django_q.conf import Conf
+from django_q.brokers import Broker
+from boto3 import Session
+
+
+class Sqs(Broker):
+ def __init__(self, list_key=Conf.PREFIX):
+ self.sqs = None
+ super(Sqs, self).__init__(list_key)
+ self.queue = self.get_queue()
+
+ def enqueue(self, task):
+ response = self.queue.send_message(MessageBody=task)
+ return response.get('MessageId')
+
+ def dequeue(self):
+ # sqs supports max 10 messages in bulk
+ if Conf.BULK > 10:
+ Conf.BULK = 10
+ tasks = self.queue.receive_messages(MaxNumberOfMessages=Conf.BULK, VisibilityTimeout=Conf.RETRY)
+ if tasks:
+ return [(t.receipt_handle, t.body) for t in tasks]
+
+ def acknowledge(self, task_id):
+ return self.delete(task_id)
+
+ def queue_size(self):
+ return int(self.queue.attributes['ApproximateNumberOfMessages'])
+
+ def lock_size(self):
+ return int(self.queue.attributes['ApproximateNumberOfMessagesNotVisible'])
+
+ def delete(self, task_id):
+ message = self.sqs.Message(self.queue.url, task_id)
+ message.delete()
+
+ def fail(self, task_id):
+ self.delete(task_id)
+
+ def delete_queue(self):
+ self.queue.delete()
+
+ def purge_queue(self):
+ self.queue.purge()
+
+ def ping(self):
+ return 'sqs' in self.connection.get_available_resources()
+
+ def info(self):
+ return 'AWS SQS'
+
+ @staticmethod
+ def get_connection(list_key=Conf.PREFIX):
+ return Session(aws_access_key_id=Conf.SQS['aws_access_key_id'],
+ aws_secret_access_key=Conf.SQS['aws_secret_access_key'],
+ region_name=Conf.SQS['aws_region'])
+
+ def get_queue(self):
+ self.sqs = self.connection.resource('sqs')
+ return self.sqs.create_queue(QueueName=self.list_key)
diff --git a/django_q/brokers/disque.py b/django_q/brokers/disque.py
new file mode 100644
index 0000000..1fc4bf3
--- /dev/null
+++ b/django_q/brokers/disque.py
@@ -0,0 +1,65 @@
+import random
+import redis
+from django_q.brokers import Broker
+from django_q.conf import Conf
+
+
+class Disque(Broker):
+ def enqueue(self, task):
+ retry = Conf.RETRY if Conf.RETRY > 0 else '{} REPLICATE 1'.format(Conf.RETRY)
+ return self.connection.execute_command(
+ 'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()
+
+ def dequeue(self):
+ tasks = self.connection.execute_command(
+ 'GETJOB COUNT {} TIMEOUT 1000 FROM {}'.format(Conf.BULK, self.list_key))
+ if tasks:
+ return [(t[1].decode(), t[2].decode()) for t in tasks]
+
+ def queue_size(self):
+ return self.connection.execute_command('QLEN {}'.format(self.list_key))
+
+ def acknowledge(self, task_id):
+ command = 'FASTACK' if Conf.DISQUE_FASTACK else 'ACKJOB'
+ return self.connection.execute_command('{} {}'.format(command,task_id))
+
+ def ping(self):
+ return self.connection.execute_command('HELLO')[0] > 0
+
+ def delete(self, task_id):
+ return self.connection.execute_command('DELJOB {}'.format(task_id))
+
+ def fail(self, task_id):
+ return self.delete(task_id)
+
+ def delete_queue(self):
+ jobs = self.connection.execute_command('JSCAN QUEUE {}'.format(self.list_key))[1]
+ if jobs:
+ job_ids = ' '.join(jid.decode() for jid in jobs)
+ self.connection.execute_command('DELJOB {}'.format(job_ids))
+ return len(jobs)
+
+ def info(self):
+ if not self._info:
+ info = self.connection.info('server')
+ self._info= 'Disque {}'.format(info['disque_version'])
+ return self._info
+
+ @staticmethod
+ def get_connection(list_key=Conf.PREFIX):
+ # randomize nodes
+ random.shuffle(Conf.DISQUE_NODES)
+ # find one that works
+ for node in Conf.DISQUE_NODES:
+ host, port = node.split(':')
+ kwargs = {'host': host, 'port': port}
+ if Conf.DISQUE_AUTH:
+ kwargs['password'] = Conf.DISQUE_AUTH
+ redis_client = redis.Redis(**kwargs)
+ redis_client.decode_responses = True
+ try:
+ redis_client.execute_command('HELLO')
+ return redis_client
+ except redis.exceptions.ConnectionError:
+ continue
+ raise redis.exceptions.ConnectionError('Could not connect to any Disque nodes')
diff --git a/django_q/brokers/ironmq.py b/django_q/brokers/ironmq.py
new file mode 100644
index 0000000..f0b0725
--- /dev/null
+++ b/django_q/brokers/ironmq.py
@@ -0,0 +1,50 @@
+from requests.exceptions import HTTPError
+from django_q.conf import Conf
+from django_q.brokers import Broker
+from iron_mq import IronMQ
... 11334 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/django-q.git
More information about the Python-modules-commits
mailing list