[med-svn] [biomaj3-daemon] 01/02: New upstream version 3.0.12
Olivier Sallou
osallou at debian.org
Thu Aug 17 14:45:29 UTC 2017
This is an automated email from the git hooks/post-receive script.
osallou pushed a commit to branch master
in repository biomaj3-daemon.
commit e186f84f59cb7ed8b76d8dface52718a88aeba42
Author: Olivier Sallou <osallou at debian.org>
Date: Thu Aug 17 12:56:59 2017 +0000
New upstream version 3.0.12
---
.gitignore | 72 +++
CHANGES.txt | 31 ++
MANIFEST.in | 1 +
README.md | 37 ++
bin/biomaj_daemon_consumer.py | 50 ++
biomaj_daemon/__init__.py | 0
biomaj_daemon/daemon/__init__.py | 0
biomaj_daemon/daemon/biomaj_daemon_web.py | 452 ++++++++++++++++++
biomaj_daemon/daemon/daemon_service.py | 232 ++++++++++
biomaj_daemon/daemon/utils.py | 739 ++++++++++++++++++++++++++++++
biomaj_daemon/daemon/wsgi.py | 4 +
config.yml | 45 ++
gunicorn_conf.py | 3 +
requirements.txt | 9 +
setup.cfg | 2 +
setup.py | 63 +++
16 files changed, 1740 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..754c2a7
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,72 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+
+# Coveralls
+.coveralls.yml
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.cache
+nosetests.xml
+coverage.xml
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+# PyCharm
+.idea
+
+# Vim
+.viminfo
+# Less history
+.lesshst
+
+.dbshell
+.emacs*
+.ipython
+.mongo*
+#*.properties
+
diff --git a/CHANGES.txt b/CHANGES.txt
new file mode 100644
index 0000000..edfdc50
--- /dev/null
+++ b/CHANGES.txt
@@ -0,0 +1,31 @@
+3.0.12:
+ Add biomaj_daemon_consumer.py to python scripts for install with package
+3.0.11:
+ Disable daemon web service logging
+3.0.10:
+ Fix tail endpoint syntax
+3.0.9:
+ Fix #1 remove debug log
+ Fix #2 log dir not removed with --remove-all if proxy option not set
+3.0.8:
+ Fix #78, for multiple banks update in monolithic config, if one fails, next banks are not updated
+ Add /api/daemon/bank/x/log[/y] endpoint to get last bank session log file
+3.0.7:
+ Add whatsup support for non proxy config
+ Skip bank config with errors
+3.0.6:
+ Fix logging for monolithic setup
+ Add whatsup option
+ Fix prometheus stats
+3.0.5:
+ Fix for python 2.x
+3.0.4:
+ Fix status page with other services
+ Add missing README in package
+3.0.3:
+ Fix missing parameters
+3.0.2:
+ Move options to management to utils for reuse
+ Fix --about-me
+3.0.1:
+ Micro service to manage biomaj updates
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..376b77a
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include *.txt *.md
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..1681033
--- /dev/null
+++ b/README.md
@@ -0,0 +1,37 @@
+# About
+
+Microservice to manage biomaj, acts as a frontend to receive biomaj-cli commands and execute operations
+
+Needs mongo and redis
+
+
+
+# Development
+
+ flake8 --ignore E501 biomaj_daemon
+
+
+# Run
+
+## Message consumer:
+
+ export BIOMAJ_CONFIG=path_to_config.yml
+ python bin/biomaj_daemon_consumer.py
+
+## Web server
+
+If package is installed via pip, you need a file named *gunicorn_conf.py* containing somehwhere on local server:
+
+ def worker_exit(server, worker):
+ from prometheus_client import multiprocess
+ multiprocess.mark_process_dead(worker.pid)
+
+If you cloned the repository and installed it via python setup.py install, just refer to the *gunicorn_conf.py* in the cloned repository.
+
+ export BIOMAJ_CONFIG=path_to_config.yml
+ rm -rf ..path_to/godocker-prometheus-multiproc
+ mkdir -p ..path_to/godocker-prometheus-multiproc
+ export prometheus_multiproc_dir=..path_to/godocker-prometheus-multiproc
+ gunicorn -c ../path_to/gunicorn_conf.py biomaj_daemon.daemon.biomaj_daemon_web:app
+
+Web processes should be behind a proxy/load balancer, API base url /api/daemon
diff --git a/bin/biomaj_daemon_consumer.py b/bin/biomaj_daemon_consumer.py
new file mode 100644
index 0000000..3cb5954
--- /dev/null
+++ b/bin/biomaj_daemon_consumer.py
@@ -0,0 +1,50 @@
+import os
+import logging
+
+import requests
+import yaml
+
+from biomaj_daemon.daemon.daemon_service import DaemonService
+from biomaj_core.utils import Utils
+
+config_file = 'config.yml'
+if 'BIOMAJ_CONFIG' in os.environ:
+ config_file = os.environ['BIOMAJ_CONFIG']
+
+config = None
+with open(config_file, 'r') as ymlfile:
+ config = yaml.load(ymlfile)
+ Utils.service_config_override(config)
+
+
+def on_executed(bank, procs):
+ metrics = []
+ if not procs:
+ metric = {'bank': bank, 'error': 1}
+ metrics.append(metrics)
+ else:
+ for proc in procs:
+ metric = {'bank': bank}
+ if 'action' in proc:
+ metric['action'] = proc['action']
+ else:
+ metric['action'] = 'none'
+ if 'updated' in proc:
+ metric['updated'] = 1 if proc['updated'] else 0
+ else:
+ metric['updated'] = 0
+ if 'error' in proc and proc['error']:
+ metric['error'] = 1
+ else:
+ metric['execution_time'] = proc['execution_time']
+ metrics.append(metric)
+ try:
+ requests.post(config['web']['local_endpoint'] + '/api/daemon/metrics', json=metrics)
+ except Exception as e:
+ logging.error('Failed to post metrics: ' + str(e))
+
+
+process = DaemonService(config_file)
+process.on_executed_callback(on_executed)
+process.supervise()
+process.wait_for_messages()
diff --git a/biomaj_daemon/__init__.py b/biomaj_daemon/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/biomaj_daemon/daemon/__init__.py b/biomaj_daemon/daemon/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/biomaj_daemon/daemon/biomaj_daemon_web.py b/biomaj_daemon/daemon/biomaj_daemon_web.py
new file mode 100644
index 0000000..29a160e
--- /dev/null
+++ b/biomaj_daemon/daemon/biomaj_daemon_web.py
@@ -0,0 +1,452 @@
+import ssl
+import os
+import yaml
+import logging
+from collections import deque
+
+from flask import Flask
+from flask import jsonify
+from flask import request
+from flask import abort
+from flask import Response
+
+import requests
+
+from prometheus_client import Counter
+from prometheus_client import Gauge
+from prometheus_client.exposition import generate_latest
+from prometheus_client import multiprocess
+from prometheus_client import CollectorRegistry
+
+import consul
+import redis
+from pymongo import MongoClient
+import pika
+
+from biomaj.schema_version import SchemaVersion
+
+from biomaj.options import Options as BmajOptions
+from biomaj_core.config import BiomajConfig
+from biomaj_core.utils import Utils
+from biomaj.bank import Bank
+
+from biomaj_daemon.daemon.utils import biomaj_client_action
+
+config_file = 'config.yml'
+if 'BIOMAJ_CONFIG' in os.environ:
+ config_file = os.environ['BIOMAJ_CONFIG']
+
+config = None
+with open(config_file, 'r') as ymlfile:
+ config = yaml.load(ymlfile)
+ Utils.service_config_override(config)
+
+BiomajConfig.load_config(config['biomaj']['config'])
+
+last_status_check = None
+last_status = None
+
+data_dir = BiomajConfig.global_config.get('GENERAL', 'data.dir')
+if not os.path.exists(data_dir):
+ os.makedirs(data_dir)
+log_dir = BiomajConfig.global_config.get('GENERAL', 'log.dir')
+if not os.path.exists(log_dir):
+ os.makedirs(log_dir)
+process_dir = BiomajConfig.global_config.get('GENERAL', 'process.dir')
+if not os.path.exists(process_dir):
+ os.makedirs(process_dir)
+cache_dir = BiomajConfig.global_config.get('GENERAL', 'cache.dir')
+if not os.path.exists(cache_dir):
+ os.makedirs(cache_dir)
+lock_dir = BiomajConfig.global_config.get('GENERAL', 'lock.dir')
+if not os.path.exists(lock_dir):
+ os.makedirs(lock_dir)
+
+redis_client = redis.StrictRedis(
+ host=config['redis']['host'],
+ port=config['redis']['port'],
+ db=config['redis']['db'],
+ decode_responses=True
+)
+
+logging.info("Check database schema and upgrade if necessary")
+SchemaVersion.migrate_pendings()
+
+app = Flask(__name__)
+
+biomaj_metric = Counter("biomaj_daemon_total", "Bank total update execution.", ['bank', 'action', 'updated'])
+biomaj_error_metric = Counter("biomaj_daemon_errors", "Bank total update errors.", ['bank', 'action'])
+biomaj_time_metric = Gauge("biomaj_daemon_time", "Bank update execution time in seconds.", ['bank', 'action', 'updated'], multiprocess_mode='all')
+
+
+def consul_declare(config):
+ if config['consul']['host']:
+ consul_agent = consul.Consul(host=config['consul']['host'])
+ consul_agent.agent.service.register('biomaj-daemon', service_id=config['consul']['id'], address=config['web']['hostname'], port=config['web']['port'], tags=['biomaj'])
+ check = consul.Check.http(url='http://' + config['web']['hostname'] + ':' + str(config['web']['port']) + '/api/daemon', interval=20)
+ consul_agent.agent.check.register(config['consul']['id'] + '_check', check=check, service_id=config['consul']['id'])
+
+
+consul_declare(config)
+
+OPTIONS_PARAMS = {
+ 'config': None,
+ 'check': False,
+ 'update': False,
+ 'fromscratch': False,
+ 'publish': False,
+ 'unpublish': False,
+ 'release': None,
+ 'from_task': None,
+ 'process': None,
+ 'log': None,
+ 'remove': False,
+ 'removeall': False,
+ 'removepending': False,
+ 'status': False,
+ 'bank': None,
+ 'owner': None,
+ 'stop_before': None,
+ 'stop_after': None,
+ 'freeze': False,
+ 'unfreeze': False,
+ 'force': False,
+ 'help': False,
+ 'search': False,
+ 'formats': None,
+ 'types': None,
+ 'query': None,
+ 'show': False,
+ 'newbank': None,
+ 'newdir': None,
+ 'visibility': None,
+ 'maintenance': None,
+ 'version': False,
+ 'statusko': False,
+ 'trace': False,
+ 'whatsup': False,
+ 'lastlog': None,
+ 'tail': 100
+}
+
+
+class Options(object):
+ def __init__(self, d):
+ self.__dict__ = d
+ for key in list(OPTIONS_PARAMS.keys()):
+ if not self.has_option(key):
+ setattr(self, key, OPTIONS_PARAMS[key])
+
+ def has_option(self, option):
+ if hasattr(self, option):
+ return True
+ else:
+ return False
+
+ def get_option(self, option):
+ """
+ Gets an option if present, else return None
+ """
+ if hasattr(self, option):
+ return getattr(self, option)
+ return None
+
+
+ at app.route('/api/daemon', methods=['GET'])
+def ping():
+ return jsonify({'msg': 'pong'})
+
+
+ at app.route('/api/daemon/status', methods=['GET'])
+def biomaj_status_info():
+ '''
+ if last_status_check is not None:
+ cur = datetime.datetime.now()
+ if (cur - last_status_check).total_seconds() < 10:
+ return status
+ last_status_check = datetime.datetime.now()
+ '''
+ status = {
+ 'status': [{'service': 'biomaj-public-proxy', 'status': 1, 'count': 1}],
+ 'biomaj_services': [],
+ 'general_services': [{
+ 'id': 'biomaj-public-proxy',
+ 'host': '',
+ 'status': True
+ }]
+ }
+
+ # Check redis
+ logging.debug("Status: check redis")
+ redis_ok = False
+ try:
+ pong = redis_client.ping()
+ if pong:
+ redis_ok = True
+ except Exception as e:
+ logging.error('Failed to ping redis: ' + str(e))
+ if not redis_ok:
+ status['status'].append({'service': 'biomaj-redis', 'status': -1, 'count': 0})
+ else:
+ status['status'].append({'service': 'biomaj-redis', 'status': 1, 'count': 1})
+ status['general_services'].append({
+ 'id': 'biomaj-redis',
+ 'host': config['redis']['host'],
+ 'status': True
+ })
+
+ # Check internal proxy
+ r = requests.get(config['web']['local_endpoint'] + '/api/user')
+ if not r.status_code == 200:
+ status['status'].append({'service': 'biomaj-internal-proxy', 'status': -1, 'count': 0})
+ else:
+ status['status'].append({'service': 'biomaj-internal-proxy', 'status': 1, 'count': 1})
+ status['general_services'].append({
+ 'id': 'biomaj-internal-proxy',
+ 'host': config['web']['local_endpoint'],
+ 'status': True
+ })
+
+ # Check mongo
+ logging.debug("Status: check mongo")
+ if 'mongo' in config:
+ mongo_ok = False
+ try:
+ biomaj_mongo = MongoClient(config['mongo']['url'])
+ biomaj_mongo.server_info()
+ mongo_ok = True
+ except Exception as e:
+ logging.error('Failed to connect to mongo')
+ if not mongo_ok:
+ status['status'].append({'service': 'biomaj-mongo', 'status': -1, 'count': 0})
+ else:
+ status['status'].append({'service': 'biomaj-mongo', 'status': 1, 'count': 1})
+ status['general_services'].append({
+ 'id': 'biomaj-mongo',
+ 'host': config['mongo']['url'],
+ 'status': True
+ })
+
+ # Check rabbitmq
+ logging.debug("Status: check rabbit")
+ if 'rabbitmq' in config and 'host' in config['rabbitmq'] and config['rabbitmq']['host']:
+ rabbit_ok = False
+ channel = None
+ try:
+ connection = None
+ rabbitmq_port = config['rabbitmq']['port']
+ rabbitmq_user = config['rabbitmq']['user']
+ rabbitmq_password = config['rabbitmq']['password']
+ rabbitmq_vhost = config['rabbitmq']['virtual_host']
+ if rabbitmq_user:
+ credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
+ connection = pika.BlockingConnection(pika.ConnectionParameters(config['rabbitmq']['host'], rabbitmq_port, rabbitmq_vhost, credentials))
+ else:
+ connection = pika.BlockingConnection(pika.ConnectionParameters(config['rabbitmq']['host']))
+ channel = connection.channel()
+ rabbit_ok = True
+ except Exception as e:
+ logging.error('Rabbitmq connection error: ' + str(e))
+ finally:
+ if channel:
+ channel.close()
+ if not rabbit_ok:
+ status['status'].append({'service': 'biomaj-rabbitmq', 'status': -1, 'count': 0})
+ else:
+ status['status'].append({'service': 'biomaj-rabbitmq', 'status': 1, 'count': 1})
+ status['general_services'].append({
+ 'id': 'biomaj-rabbitmq',
+ 'host': config['rabbitmq']['host'],
+ 'status': True
+ })
+
+ logging.debug("Status: check consul services")
+ r = requests.get('http://' + config['consul']['host'] + ':8500/v1/agent/services')
+ if not r.status_code == 200:
+ status['status'].append({'service': 'biomaj-consul', 'status': -1, 'count': 0})
+ last_status = status
+ return status
+
+ status['status'].append({'service': 'biomaj-consul', 'status': 1, 'count': 1})
+ status['general_services'].append({
+ 'id': 'biomaj-consul',
+ 'host': config['consul']['host'],
+ 'status': True
+ })
+
+ consul_services = r.json()
+ services = []
+ for consul_service in list(consul_services.keys()):
+ current_service = consul_services[consul_service]['Service']
+ if current_service in services or current_service == 'consul':
+ continue
+ else:
+ services.append(current_service)
+ check_r = requests.get('http://' + config['consul']['host'] + ':8500/v1/health/service/' + consul_services[consul_service]['Service'])
+ if not check_r.status_code == 200:
+ status['status'].append({'service': 'biomaj-consul', 'status': -1, 'count': 0})
+ last_status = status
+ return status
+ checks = check_r.json()
+ nb_service = 0
+ nb_service_ok = 0
+ for service_check in checks:
+ nb_service += 1
+ check_status = True
+ for check in service_check['Checks']:
+ if check['Status'] != 'passing':
+ check_status = False
+ break
+ if check_status:
+ nb_service_ok += 1
+ status['biomaj_services'].append({
+ 'id': service_check['Service']['Service'],
+ 'host': service_check['Service']['Address'],
+ 'status': check_status
+ })
+ check_status = -1
+ if nb_service == nb_service_ok:
+ check_status = 1
+ elif nb_service_ok == 0:
+ check_status = -1
+ else:
+ check_status = 0
+ status['status'].append({'service': consul_services[consul_service]['Service'], 'count': nb_service, 'status': check_status})
+
+ # Check missing services
+ biomaj_services = ['biomaj-watcher', 'biomaj-daemon', 'biomaj-download', 'biomaj-process', 'biomaj-user', 'biomaj-cron', 'biomaj-ftp']
+ for biomaj_service in biomaj_services:
+ if biomaj_service not in services:
+ status['status'].append({'service': biomaj_service, 'count': 0, 'status': -1})
+
+ last_status = status
+
+ return jsonify(status)
+
+ at app.route('/api/daemon/bank/<bank>/log', methods=['GET'])
+def biomaj_bank_log(bank):
+ return biomaj_bank_log_tail(bank, 0)
+
+ at app.route('/api/daemon/bank/<bank>/log/<tail>', methods=['GET'])
+def biomaj_bank_log_tail(bank, tail=100):
+ apikey = request.headers.get('Authorization')
+ token = None
+
+ if apikey:
+ bearer = apikey.split()
+ if bearer[0] == 'APIKEY':
+ token = bearer[1]
+ log_file = None
+ try:
+ user = None
+ options_object = Options(OPTIONS_PARAMS)
+ if token:
+ r = requests.get(config['web']['local_endpoint'] + '/api/user/info/apikey/' + token)
+ if not r.status_code == 200:
+ abort(404, {'message': 'Invalid API Key or connection issue'})
+ user = r.json()['user']
+ options_object = Options({'user': user['id']})
+
+ bank_log = Bank(bank, options=options_object, no_log=True)
+ if bank_log.bank['properties']['visibility'] != 'public' and not bank_log.is_owner():
+ abort(403, {'message': 'not authorized to access this bank'})
+ if 'status' not in bank_log.bank or 'log_file' not in bank_log.bank['status'] or not bank_log.bank['status']['log_file']['status']:
+ return "No log file available"
+ log_file = bank_log.bank['status']['log_file']['status']
+ except Exception as e:
+ logging.exception(e)
+ return "Failed to access log file: " + str(e)
+ if not log_file or not os.path.exists(log_file):
+ return "Cannot access log file %s" % (str(log_file))
+
+ def generate():
+ with open(log_file) as fp:
+ tail_l = int(tail)
+ if tail_l == 0:
+ for line in fp:
+ yield line
+ else:
+ dq = deque(fp, maxlen=tail_l)
+ for line in dq:
+ yield line
+ yield "##END_OF_LOG"
+
+ return Response(generate(), mimetype='text/plain')
+
+
+
+ at app.route('/api/daemon', methods=['POST'])
+def biomaj_daemon():
+ '''
+ Execute a command request (bank update, removal, ...)
+ '''
+ apikey = request.headers.get('Authorization')
+ token = None
+
+ if apikey:
+ bearer = apikey.split()
+ if bearer[0] == 'APIKEY':
+ token = bearer[1]
+ try:
+ params = request.get_json()
+ options = params['options']
+ options_object = Options(options)
+ options_object.token = token
+ options_object.user = None
+ options_object.redis_host = config['redis']['host']
+ options_object.redis_port = config['redis']['port']
+ options_object.redis_db = config['redis']['db']
+ options_object.redis_prefix = config['redis']['prefix']
+
+ user = None
+ if token:
+ r = requests.get(config['web']['local_endpoint'] + '/api/user/info/apikey/' + token)
+ if not r.status_code == 200:
+ abort(404, {'message': 'Invalid API Key or connection issue'})
+ user = r.json()['user']
+ if user:
+ options_object.user = user['id']
+
+ if options_object.bank:
+ bmaj_options = BmajOptions(options_object)
+ BiomajConfig(options_object.bank, bmaj_options)
+
+ if not options_object.search and not options_object.show and not options_object.check and not options_object.status:
+ if not user:
+ abort(403, {'message': 'This action requires authentication with api key'})
+
+ (res, msg) = biomaj_client_action(options_object, config)
+ except Exception as e:
+ logging.exception(e)
+ return jsonify({'status': False, 'msg': str(e)})
+ return jsonify({'status': res, 'msg': msg})
+
+
+ at app.route('/metrics', methods=['GET'])
+def metrics():
+ registry = CollectorRegistry()
+ multiprocess.MultiProcessCollector(registry)
+ return generate_latest(registry)
+
+
+ at app.route('/api/daemon/metrics', methods=['POST'])
+def add_metrics():
+ '''
+ Expects a JSON request with an array of {'bank': 'bank_name', 'error': 'error_message', 'execution_time': seconds_to_execute, 'action': 'update|remove', 'updated': 0|1}
+ '''
+ procs = request.get_json()
+ for proc in procs:
+ if 'error' in proc and proc['error']:
+ biomaj_error_metric.labels(proc['bank'], proc['action']).inc()
+ else:
+ biomaj_metric.labels(proc['bank'], proc['action'], proc['updated']).inc()
+ biomaj_time_metric.labels(proc['bank'], proc['action'], proc['updated']).set(proc['execution_time'])
+ return jsonify({'msg': 'OK'})
+
+
+if __name__ == "__main__":
+ context = None
+ if config['tls']['cert']:
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+ context.load_cert_chain(config['tls']['cert'], config['tls']['key'])
+ app.run(host='0.0.0.0', port=config['web']['port'], ssl_context=context, threaded=True, debug=config['web']['debug'])
diff --git a/biomaj_daemon/daemon/daemon_service.py b/biomaj_daemon/daemon/daemon_service.py
new file mode 100644
index 0000000..434a50f
--- /dev/null
+++ b/biomaj_daemon/daemon/daemon_service.py
@@ -0,0 +1,232 @@
+import logging
+import logging.config
+import yaml
+import traceback
+import datetime
+import time
+import json
+import threading
+
+import redis
+import consul
+from flask import Flask
+from flask import jsonify
+
+from biomaj_core.config import BiomajConfig
+from biomaj.bank import Bank
+from biomaj.notify import Notify
+from biomaj_core.utils import Utils
+
+from biomaj_zipkin.zipkin import Zipkin
+
+
+app = Flask(__name__)
+app_log = logging.getLogger('werkzeug')
+app_log.setLevel(logging.ERROR)
+
+
+ at app.route('/api/daemon-message')
+def ping():
+ return jsonify({'msg': 'pong'})
+
+
+def start_web(config):
+ app.run(host='0.0.0.0', port=config['web']['port'])
+
+
+def consul_declare(config):
+ if config['consul']['host']:
+ consul_agent = consul.Consul(host=config['consul']['host'])
+ consul_agent.agent.service.register(
+ 'biomaj-daemon-message',
+ service_id=config['consul']['id'],
+ address=config['web']['hostname'],
+ port=config['web']['port'],
+ tags=['biomaj']
+ )
+ check = consul.Check.http(
+ url='http://' + config['web']['hostname'] + ':' + str(config['web']['port']) + '/api/daemon-message',
+ interval=20
+ )
+ consul_agent.agent.check.register(
+ config['consul']['id'] + '_check',
+ check=check,
+ service_id=config['consul']['id']
+ )
+ return True
+ else:
+ return False
+
+
+class Options(object):
+ def __init__(self, d):
+ self.__dict__ = d
+
+ def get_option(self, option):
+ """
+ Gets an option if present, else return None
+ """
+ if hasattr(self, option):
+ return getattr(self, option)
+ return None
+
+
+class DaemonService(object):
+
+ channel = None
+
+ def supervise(self):
+ if consul_declare(self.config):
+ web_thread = threading.Thread(target=start_web, args=(self.config,))
+ web_thread.start()
+
+ def __init__(self, config_file):
+ self.logger = logging
+ self.session = None
+ self.executed_callback = None
+ with open(config_file, 'r') as ymlfile:
+ self.config = yaml.load(ymlfile)
+ Utils.service_config_override(self.config)
+
+ Zipkin.set_config(self.config)
+
+ BiomajConfig.load_config(self.config['biomaj']['config'])
+
+ if 'log_config' in self.config:
+ for handler in list(self.config['log_config']['handlers'].keys()):
+ self.config['log_config']['handlers'][handler] = dict(self.config['log_config']['handlers'][handler])
+ logging.config.dictConfig(self.config['log_config'])
+ self.logger = logging.getLogger('biomaj')
+
+ self.redis_client = redis.StrictRedis(
+ host=self.config['redis']['host'],
+ port=self.config['redis']['port'],
+ db=self.config['redis']['db'],
+ decode_responses=True
+ )
+
+ self.logger.info('Daemon service started')
+
+ def close(self):
+ if self.channel:
+ self.channel.close()
+
+ def on_executed_callback(self, func):
+ self.executed_callback = func
+
+ def __start_action(self, bank, action):
+ whatsup = bank + ':' + str(action)
+ self.redis_client.hset(
+ self.config['redis']['prefix'] + ':daemons:status',
+ self.config['consul']['id'],
+ whatsup
+ )
+ # Expires in 7 days if no update
+ self.redis_client.expire(
+ self.config['redis']['prefix'] + ':daemons:status',
+ 3600*24*7
+ )
+
+ def __end_action(self):
+ self.redis_client.hset(
+ self.config['redis']['prefix'] + ':daemons:status',
+ self.config['consul']['id'],
+ 'pending'
+ )
+
+ def execute(self, options):
+ '''
+ Execute update or remove command
+ '''
+ start_time = datetime.datetime.now()
+ start_time = time.mktime(start_time.timetuple())
+
+ is_ok = None
+ is_updated = False
+ action = None
+ try:
+ options.no_log = False
+ if options.update:
+ action = 'update'
+ self.__start_action(options.bank, action)
+ bmaj = Bank(options.bank, options)
+ self.logger.debug('Log file: ' + bmaj.config.log_file)
+ is_ok = bmaj.update(depends=True)
+ is_updated = bmaj.session.get('update')
+ Notify.notifyBankAction(bmaj)
+ self.__end_action()
+ elif options.remove or options.removeall:
+ action = 'remove'
+ self.__start_action(options.bank, action)
+ if options.removeall:
+ bmaj = Bank(options.bank, options, no_log=True)
+ print('Log file: ' + bmaj.config.log_file)
+ is_ok = bmaj.removeAll(options.force)
+ else:
+ bmaj = Bank(options.bank, options)
+ self.logger.debug('Log file: ' + bmaj.config.log_file)
+ is_ok = bmaj.remove(options.release)
+ Notify.notifyBankAction(bmaj)
+ self.__end_action()
+ elif options.removepending:
+ bmaj = Bank(options.bank, options, no_log=True)
+ bmaj.remove_pending(options.release)
+ except Exception as e:
+ self.logger.exception('Exception: ' + str(e))
+ is_ok = False
+
+ end_time = datetime.datetime.now()
+ end_time = time.mktime(end_time.timetuple())
+
+ execution_time = end_time - start_time
+ return {
+ 'error': not is_ok,
+ 'execution_time': execution_time,
+ 'action': action,
+ 'updated': is_updated
+ }
+
+ def callback_messages(self, body):
+ '''
+ Manage bank update or removal
+ '''
+ try:
+ options = Options(body)
+ info = self.execute(options)
+ if self.executed_callback and options.bank:
+ self.executed_callback(options.bank, [info])
+ except Exception as e:
+ self.logger.error('Error with message: %s' % (str(e)))
+ traceback.print_exc()
+
+ def wait_for_messages(self):
+ '''
+ Loop queue waiting for messages
+ '''
+ while True:
+ msg = self.redis_client.rpop(self.config['redis']['prefix'] + ':queue')
+ if msg:
+ msg = json.loads(msg)
+ info = None
+ if 'update' in msg and msg['update']:
+ info = 'update'
+ if 'remove' in msg and msg['remove']:
+ info = 'remove'
+ if 'removeall' in msg and msg['removeall']:
+ info = 'removeall'
+ logging.info('Biomaj:Daemon:' + msg['bank'] + ':' + str(info))
+
+ span = None
+ if 'trace' in msg and msg['trace']:
+ span = Zipkin('biomaj', 'workflow')
+ span.add_binary_annotation('operation', info)
+ msg['traceId'] = span.get_trace_id()
+ msg['spanId'] = span.get_span_id()
+
+ self.callback_messages(msg)
+
+ if span:
+ span.trace()
+
+ logging.info('Biomaj:Daemon:' + msg['bank'] + ':over')
+ time.sleep(1)
diff --git a/biomaj_daemon/daemon/utils.py b/biomaj_daemon/daemon/utils.py
new file mode 100644
index 0000000..8c7efeb
--- /dev/null
+++ b/biomaj_daemon/daemon/utils.py
@@ -0,0 +1,739 @@
+import os
+import pkg_resources
+import shutil
+import json
+import datetime
+import time
+import redis
+import sys
+import logging
+
+import requests
+
+from tabulate import tabulate
+
+from biomaj.bank import Bank
+from biomaj_core.config import BiomajConfig
+from biomaj.workflow import Workflow
+from biomaj.workflow import UpdateWorkflow
+from biomaj.workflow import RemoveWorkflow
+from biomaj.notify import Notify
+
+if sys.version < '3':
+ import ConfigParser as configparser
+else:
+ import configparser
+
+
+def biomaj_version(options, config):
+ '''
+ Get biomaj version
+ '''
+ version = pkg_resources.require('biomaj')[0].version
+ return (True, 'Version: ' + str(version))
+
+
+def check_options(options, config):
+ if options.stop_after or options.stop_before or options.from_task:
+ available_steps = []
+ for flow in UpdateWorkflow.FLOW:
+ available_steps.append(flow['name'])
+ for flow in RemoveWorkflow.FLOW:
+ available_steps.append(flow['name'])
+ if options.stop_after:
+ if options.stop_after not in available_steps:
+ return (False, 'Invalid step: ' + options.stop_after)
+ if options.stop_before:
+ if options.stop_before not in available_steps:
+ return (False, 'Invalid step: ' + options.stop_before)
+ if options.from_task:
+ if options.from_task not in available_steps:
+ return (False, 'Invalid step: ' + options.from_task)
+ return (True, None)
+
+
+def biomaj_maintenance(options, config):
+ '''
+ Maintenance management
+ '''
+ if options.maintenance not in ['on', 'off', 'status']:
+ print("Wrong maintenance value [on,off,status]")
+ return (False, "Wrong maintenance value [on,off,status]")
+
+ data_dir = BiomajConfig.global_config.get('GENERAL', 'data.dir')
+ if BiomajConfig.global_config.has_option('GENERAL', 'lock.dir'):
+ lock_dir = BiomajConfig.global_config.get('GENERAL', 'lock.dir')
+ else:
+ lock_dir = data_dir
+
+ maintenance_lock_file = os.path.join(lock_dir, 'biomaj.lock')
+ if options.maintenance == 'status':
+ if os.path.exists(maintenance_lock_file):
+ return (True, "Maintenance: On")
+ else:
+ return (True, "Maintenance: Off")
+
+ if options.maintenance == 'on':
+ f = open(maintenance_lock_file, 'w')
+ f.write('1')
+ f.close()
+ return (True, "Maintenance set to On")
+
+ if options.maintenance == 'off':
+ if os.path.exists(maintenance_lock_file):
+ os.remove(maintenance_lock_file)
+ return (True, "Maintenance set to Off")
+
+
+def biomaj_owner(options, config):
+ '''
+ Bank ownership management
+ '''
+ if not options.bank:
+ return (False, "Bank option is missing")
+ bank = Bank(options.bank, options=options, no_log=True)
+ bank.set_owner(options.owner)
+ return (True, None)
+
+
+def biomaj_visibility(options, config):
+ '''
+ Bank visibility management
+ '''
+ if not options.bank:
+ return (False, "Bank option is missing")
+ if options.visibility not in ['public', 'private']:
+ return (False, "Valid values are public|private")
+
+ bank = Bank(options.bank, options=options, no_log=True)
+ bank.set_visibility(options.visibility)
+ return (True, "Do not forget to update accordingly the visibility.default parameter in the configuration file")
+
+
+def biomaj_move_production_directories(options, config):
+ '''
+ Change bank production directories
+ '''
+ if not options.bank:
+ return (False, "Bank option is missing")
+
+ if not os.path.exists(options.newdir):
+ return (False, "Destination directory does not exists")
+
+ bank = Bank(options.bank, options=options, no_log=True)
+ if not bank.bank['production']:
+ return (False, "Nothing to move, no production directory")
+
+ bank.load_session(Workflow.FLOW, None)
+ w = Workflow(bank)
+ res = w.wf_init()
+ if not res:
+ return (False, 'Bank initialization failure')
+
+ for prod in bank.bank['production']:
+ session = bank.get_session_from_release(prod['release'])
+ bank.load_session(Workflow.FLOW, session)
+ prod_path = bank.session.get_full_release_directory()
+ if os.path.exists(prod_path):
+ shutil.move(prod_path, options.newdir)
+ prod['data_dir'] = options.newdir
+ bank.banks.update({'name': options.bank}, {'$set': {'production': bank.bank['production']}})
+ w.wf_over()
+ return (True, "Bank production directories moved to " + options.newdir + "\nWARNING: do not forget to update accordingly the data.dir and dir.version properties")
+
+
+def biomaj_newbank(options, config):
+ '''
+ Rename a bank
+ '''
+ if not options.bank:
+ return (False, "Bank option is missing")
+
+ bank = Bank(options.bank, options=options, no_log=True)
+ conf_dir = BiomajConfig.global_config.get('GENERAL', 'conf.dir')
+ bank_prop_file = os.path.join(conf_dir, options.bank + '.properties')
+ config_bank = configparser.SafeConfigParser()
+ config_bank.read([os.path.join(conf_dir, options.bank + '.properties')])
+ config_bank.set('GENERAL', 'db.name', options.newbank)
+ newbank_prop_file = open(os.path.join(conf_dir, options.newbank + '.properties'), 'w')
+ config_bank.write(newbank_prop_file)
+ newbank_prop_file.close()
+ bank.banks.update({'name': options.bank}, {'$set': {'name': options.newbank}})
+ os.remove(bank_prop_file)
+ return (True, "Bank " + options.bank + " renamed to " + options.newbank)
+
+
+def biomaj_search(options, config):
+ '''
+ Search within banks
+ '''
+ msg = ''
+ if options.query:
+ res = Bank.searchindex(options.query)
+ msg += 'Query matches for :' + options.query + '\n'
+ results = [["Release", "Format(s)", "Type(s)", "Files"]]
+ for match in res:
+ results.append([match['_source']['release'],
+ str(match['_source']['format']),
+ str(match['_source']['types']),
+ ','.join(match['_source']['files'])])
+ msg += tabulate(results, headers="firstrow", tablefmt="grid")
+ else:
+ formats = []
+ if options.formats:
+ formats = options.formats.split(',')
+ types = []
+ if options.types:
+ types = options.types.split(',')
+ msg += "Search by formats=" + str(formats) + ", types=" + str(types) + '\n'
+ res = Bank.search(formats, types, False)
+ results = [["Name", "Release", "Format(s)", "Type(s)", 'Published']]
+ for bank in sorted(res, key=lambda bank: (bank['name'])):
+ b = bank['name']
+ bank['production'].sort(key=lambda n: n['release'], reverse=True)
+ for prod in bank['production']:
+ iscurrent = ""
+ if prod['session'] == bank['current']:
+ iscurrent = "yes"
+ results.append([b if b else '', prod['release'], ','.join(prod['formats']),
+ ','.join(prod['types']), iscurrent])
+ msg += tabulate(results, headers="firstrow", tablefmt="grid")
+ return (True, msg)
+
+
+def biomaj_show(options, config):
+ '''
+ show bank details
+ '''
+ if not options.bank:
+ return (False, "Bank option is required")
+
+ bank = Bank(options.bank, options=options, no_log=True)
+ results = [["Name", "Release", "Format(s)", "Type(s)", "Tag(s)", "File(s)"]]
+ current = None
+ fformat = None
+ if 'current' in bank.bank and bank.bank['current']:
+ current = bank.bank['current']
+ for prod in bank.bank['production']:
+ include = True
+ release = prod['release']
+ if current == prod['session']:
+ release += ' (current)'
+ if options.release and (prod['release'] != options.release and prod['prod_dir'] != options.release):
+ include = False
+ if include:
+ session = bank.get_session_from_release(prod['release'])
+ formats = session['formats']
+ afiles = []
+ atags = []
+ atypes = []
+ for fformat in list(formats.keys()):
+ for elt in formats[fformat]:
+ atypes.append(','.join(elt['types']))
+ for tag in list(elt['tags'].keys()):
+ atags.append(elt['tags'][tag])
+ for eltfile in elt['files']:
+ afiles.append(eltfile)
+ results.append([
+ bank.bank['name'],
+ release,
+ fformat,
+ ','.join(atypes),
+ ','.join(atags),
+ ','.join(afiles)])
+ msg = tabulate(results, headers="firstrow", tablefmt="grid")
+ return (True, msg)
+
+
+def biomaj_check(options, config):
+ '''
+ Check bank properties
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+
+ bank = Bank(options.bank, options=options, no_log=True)
+ msg = options.bank + " check: " + str(bank.check()) + "\n"
+ return (True, msg)
+
+
+def biomaj_status(options, config):
+ '''
+ Get bank status information
+ '''
+ msg = ''
+ if options.bank:
+ bank = Bank(options.bank, options=options, no_log=True)
+ if bank.bank['properties']['visibility'] != 'public' and not bank.is_owner():
+ return (False, 'Access forbidden')
+ info = bank.get_bank_release_info(full=True)
+ msg += tabulate(info['info'], headers='firstrow', tablefmt='psql') + '\n'
+ msg += tabulate(info['prod'], headers='firstrow', tablefmt='psql') + '\n'
+ # do we have some pending release(s)
+ if 'pend' in info and len(info['pend']) > 1:
+ msg += tabulate(info['pend'], headers='firstrow', tablefmt='psql') + '\n'
+ else:
+ banks = Bank.list()
+ # Headers of output table
+ banks_list = [["Name", "Type(s)", "Release", "Visibility", "Last update"]]
+ for bank in sorted(banks, key=lambda k: k['name']):
+ try:
+ bank = Bank(bank['name'], options=options, no_log=True)
+ if bank.bank['properties']['visibility'] == 'public' or bank.is_owner():
+ banks_list.append(bank.get_bank_release_info()['info'])
+ except Exception as e:
+ logging.error('Failed to load bank %s: %s' % (bank['name'], str(e)))
+ msg += tabulate(banks_list, headers="firstrow", tablefmt="psql") + '\n'
+ return (True, msg)
+
+
+def biomaj_status_ko(options, config):
+ '''
+ Get failed banks
+ '''
+ banks = Bank.list()
+ banks_list = [["Name", "Type(s)", "Release", "Visibility", "Last run"]]
+ for bank in sorted(banks, key=lambda k: k['name']):
+ try:
+ bank = Bank(bank['name'], options=options, no_log=True)
+ bank.load_session(UpdateWorkflow.FLOW)
+ if bank.session is not None:
+ if bank.use_last_session and not bank.session.get_status(Workflow.FLOW_OVER):
+ wf_status = bank.session.get('workflow_status')
+ if wf_status is None or not wf_status:
+ banks_list.append(bank.get_bank_release_info()['info'])
+ except Exception as e:
+ return (False, str(e))
+ return (True, tabulate(banks_list, headers="firstrow", tablefmt="psql"))
+
+
+def biomaj_bank_update_request(options, config):
+ '''
+ Send bank update request to rabbitmq
+ '''
+ return biomaj_send_message(options, config)
+
+
+def biomaj_whatsup(options, config):
+ redis_client = None
+ whatsup = []
+
+ if options.proxy:
+ redis_client = redis.StrictRedis(
+ host=config['redis']['host'],
+ port=config['redis']['port'],
+ db=config['redis']['db'],
+ decode_responses=True
+ )
+ if not redis_client:
+ return (False, 'Redis not configured')
+
+ pending_len = redis_client.llen(config['redis']['prefix'] + ':queue')
+ if not pending_len:
+ pending_len = 0
+
+ whatsup.append(['queue', str(pending_len), 'waiting'])
+
+ if not options.proxy:
+ data_dir = BiomajConfig.global_config.get('GENERAL', 'data.dir')
+ lock_dir = data_dir
+ if BiomajConfig.global_config.has_option('GENERAL', 'lock.dir'):
+ lock_dir = BiomajConfig.global_config.get('GENERAL', 'lock.dir')
+ lock_files = [f for f in os.listdir(lock_dir) if os.path.isfile(os.path.join(lock_dir, f))]
+ for lock_file in lock_files:
+ bank_name = lock_file.replace('.lock', '')
+ whatsup.append(['system', str(bank_name), 'running'])
+ if not whatsup:
+ whatsup.append(['system', '', 'pending'])
+ else:
+ daemons_status = redis_client.hgetall(config['redis']['prefix'] + ':daemons:status')
+ msg = 'All daemons pending'
+ for daemon in list(daemons_status.keys()):
+ if daemons_status[daemon]:
+ # bank:action
+ proc = daemons_status[daemon].split(':')
+ if len(proc) == 2:
+ whatsup.append([daemon] + proc)
+ else:
+ whatsup.append([daemon, ''] + proc)
+ if whatsup:
+ msg = tabulate(whatsup, ['daemon', 'bank', 'action'], tablefmt="simple")
+ return (True, msg)
+
+
+def biomaj_send_message(options, config):
+ '''
+ Send message to rabbitmq listener
+ '''
+ if not options.proxy:
+ return (False, 'option not allowed without --proxy option')
+ redis_client = redis.StrictRedis(
+ host=config['redis']['host'],
+ port=config['redis']['port'],
+ db=config['redis']['db'],
+ decode_responses=True
+ )
+ if not redis_client:
+ return (False, 'Redis not configured')
+ cur = datetime.datetime.now()
+ options.timestamp = time.mktime(cur.timetuple())
+ redis_client.lpush(config['redis']['prefix'] + ':queue', json.dumps(options.__dict__))
+ return (True, None)
+
+
+def biomaj_bank_update(options, config):
+ '''
+ Update a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+ banks = options.bank.split(',')
+ gres = True
+ msg = ''
+ for bank in banks:
+ options.bank = bank
+ no_log = True
+ if not options.proxy:
+ no_log = False
+ # logging.debug('Options: '+str(options.__dict__))
+ bmaj = Bank(bank, options=options, no_log=no_log)
+ if bmaj.is_locked():
+ return (False, 'Bank is locked due to an other action')
+ check_status = bmaj.check()
+ if not check_status:
+ msg += 'Skip bank ' + options.bank + ': wrong config\n'
+ gres = False
+ continue
+ else:
+ msg += 'Bank update request sent for ' + options.bank + '\n'
+ if not options.proxy:
+ res = bmaj.update(depends=True)
+ Notify.notifyBankAction(bmaj)
+ else:
+ res = biomaj_bank_update_request(options, config)
+ if not res:
+ msg += 'Failed to send update request for ' + options.bank + '\n'
+ gres = False
+
+ if not gres:
+ return (False, msg)
+ return (True, msg)
+
+
+def biomaj_freeze(options, config):
+ '''
+ freeze a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+ if not options.release:
+ return (False, "Bank release is missing")
+ bmaj = Bank(options.bank, options=options)
+ res = bmaj.freeze(options.release)
+ if not res:
+ return (False, 'Failed to freeze the bank release')
+ return (True, None)
+
+
+def biomaj_unfreeze(options, config):
+ '''
+ unfreeze a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+ if not options.release:
+ return (False, "Bank release is missing")
+
+ bmaj = Bank(options.bank, options=options)
+ res = bmaj.unfreeze(options.release)
+ if not res:
+ return (False, 'Failed to unfreeze the bank release')
+ return (True, None)
+
+
+def biomaj_remove_request(options, config):
+ '''
+ Send remove request to rabbitmq
+ '''
+ return biomaj_send_message(options, config)
+
+
+def biomaj_remove(options, config):
+ '''
+ Remove a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+
+ if options.remove and not options.release:
+ return (False, "Bank release is missing")
+
+ no_log = True
+ if not options.proxy:
+ no_log = False
+ # If removeall, do not set logs as log dir will be deleted
+ if options.removeall:
+ no_log = True
+ bmaj = Bank(options.bank, options=options, no_log=no_log)
+ if bmaj.is_locked():
+ return (False, 'Bank is locked due to an other action')
+
+ res = True
+ if options.removeall:
+ if not options.proxy:
+ res = bmaj.removeAll(options.force)
+ return (res, '')
+ res = biomaj_remove_request(options, config)
+ else:
+ if not options.proxy:
+ res = bmaj.remove(options.release)
+ Notify.notifyBankAction(bmaj)
+ return (res, '')
+ res = biomaj_remove_request(options, config)
+ if not res:
+ return (False, 'Failed to send removal request')
+ return (True, 'Bank removal request sent')
+
+
+def biomaj_remove_pending_request(options, config):
+ '''
+ Send remove pending request to rabbitmq
+ '''
+ return biomaj_send_message(options, config)
+
+
+def biomaj_remove_pending(options, config):
+ '''
+ Remove pending releases
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+ bmaj = Bank(options.bank, options=options, no_log=True)
+ if bmaj.is_locked():
+ return (False, 'Bank is locked due to an other action')
+ if not options.proxy:
+ res = bmaj.remove_pending(options.release)
+ return (res, '')
+ res = biomaj_remove_pending_request(options, config)
+ if not res:
+ return (False, 'Failed to send removal request')
+ return (True, 'Request sent')
+
+
+def biomaj_unpublish(options, config):
+ '''
+ Unpublish a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name is missing")
+
+ bmaj = Bank(options.bank, options=options, no_log=True)
+ bmaj.load_session()
+ bmaj.unpublish()
+ return (True, None)
+
+
+def biomaj_publish(options, config):
+ '''
+ Publish a bank
+ '''
+ if not options.bank:
+ return (False, "Bank name or release is missing")
+ bmaj = Bank(options.bank, options=options, no_log=True)
+ bmaj.load_session()
+ bank = bmaj.bank
+ session = None
+ if options.get_option('release') is None:
+ # Get latest prod release
+ if len(bank['production']) > 0:
+ prod = bank['production'][len(bank['production']) - 1]
+ for s in bank['sessions']:
+ if s['id'] == prod['session']:
+ session = s
+ break
+ else:
+ # Search production release matching release
+ for prod in bank['production']:
+ if prod['release'] == options.release or prod['prod_dir'] == options.release:
+ # Search session related to this production release
+ for s in bank['sessions']:
+ if s['id'] == prod['session']:
+ session = s
+ break
+ break
+ if session is None:
+ return (False, "No production session could be found for this release")
+ bmaj.session._session = session
+ bmaj.publish()
+ return (True, None)
+
+
+def biomaj_update_cancel(options, config):
+ '''
+ Cancel current update of a Bank
+
+ Running actions (download, process) will continue on remote services but will not manage next actions.
+ Biomaj process will exit when ready with a *False* status.
+ '''
+ if not options.proxy:
+ return (False, 'option not allowed without --proxy option')
+ redis_client = redis.StrictRedis(
+ host=config['redis']['host'],
+ port=config['redis']['port'],
+ db=config['redis']['db'],
+ decode_responses=True
+ )
+ if not redis_client:
+ return (False, 'Redis not configured')
+ if not options.bank:
+ return (False, "Bank name is missing")
+ redis_client.set(config['redis']['prefix'] + ':' + options.bank + ':action:cancel', 1)
+ return (True, 'Requested to cancel update of bank ' + options.bank + ', update will stop once current actions are over')
+
+
+def biomaj_update_status(options, config):
+ '''
+ get the status of a bank during an update cycle
+ '''
+ if not options.proxy:
+ return (False, 'option not allowed without --proxy option')
+ redis_client = redis.StrictRedis(
+ host=config['redis']['host'],
+ port=config['redis']['port'],
+ db=config['redis']['db'],
+ decode_responses=True
+ )
+ if not redis_client:
+ return (False, 'Redis not configured')
+ pending = redis_client.llen(config['redis']['prefix'] + ':queue')
+ pending_actions = [['Pending actions', 'Date']]
+ for index in range(pending):
+ pending_action = redis_client.lindex(config['redis']['prefix'] + ':queue', index)
+ if pending_action:
+ pending_bank = json.loads(pending_action)
+ action_time = datetime.datetime.utcfromtimestamp(pending_bank['timestamp'])
+ if pending_bank['bank'] == options.bank:
+ if pending_bank['update']:
+ pending_actions.append(['Update', action_time])
+ elif pending_bank['remove'] or pending_bank['removeall']:
+ pending_actions.append(['Removal - release ' + str(pending_bank['release']), action_time])
+
+ bmaj = Bank(options.bank, options=options, no_log=True)
+ if 'status' not in bmaj.bank:
+ return (True, 'No update information available')
+ status_info = bmaj.bank['status']
+
+ msg = [["Workflow step", "Status"]]
+
+ if 'log_file' in status_info:
+ msg.append(['log_file', str(status_info['log_file']['status'])])
+ if 'session' in status_info:
+ msg.append(['session', str(status_info['session'])])
+ for flow in UpdateWorkflow.FLOW:
+ step = flow['name']
+ if step in status_info:
+ if status_info[step]['status'] is True:
+ msg.append([step, 'over'])
+ elif status_info[step]['status'] is False:
+ msg.append([step, 'error'])
+ else:
+ if status_info[Workflow.FLOW_OVER]['status'] is True:
+ msg.append([step, 'skipped'])
+ else:
+ msg.append([step, 'waiting'])
+ if step in ['postprocess', 'preprocess', 'removeprocess']:
+ progress = status_info[step]['progress']
+ for proc in list(progress.keys()):
+ msg.append([proc, str(progress[proc])])
+ return (True, tabulate(pending_actions, headers="firstrow", tablefmt="grid") + tabulate(msg, headers="firstrow", tablefmt="grid"))
+
+
+def biomaj_user_info(options, config):
+ '''
+ Get user info, need login/password
+ '''
+ if not options.userlogin or not options.userpassword:
+ return (False, 'Missing login or password')
+ if not options.proxy:
+ return (False, 'Missing proxy information')
+ bindinfo = {'type': 'password', 'value': options.userpassword}
+ try:
+ r = requests.post(config['web']['local_endpoint'] + '/api/user/bind/user/' + options.userlogin, json=bindinfo)
+ if not r.status_code == 200:
+ return (False, 'Invalid credentials')
+ user = r.json()['user']
+ except Exception as e:
+ return (False, 'Connection error to proxy: ' + str(e))
+ msg = 'User: ' + str(user['id']) + '\n'
+ msg += 'Email: ' + str(user['email']) + '\n'
+ msg += 'Api key: ' + str(user['apikey']) + '\n'
+ return (True, msg)
+
+
+def biomaj_client_action(options, config=None):
+ check_options(options, config)
+ if options.version:
+ return biomaj_version(options, config)
+
+ if options.whatsup:
+ return biomaj_whatsup(options, config)
+
+ if options.maintenance:
+ return biomaj_maintenance(options, config)
+
+ if options.owner:
+ return biomaj_owner(options, config)
+
+ if options.visibility:
+ return biomaj_visibility(options, config)
+
+ if options.newdir:
+ return biomaj_move_production_directories(options, config)
+
+ if options.newbank:
+ return biomaj_newbank(options, config)
+
+ if options.search:
+ return biomaj_search(options, config)
+
+ if options.show:
+ return biomaj_show(options, config)
+
+ if options.check:
+ return biomaj_check(options, config)
+
+ if options.status:
+ return biomaj_status(options, config)
+
+ if options.statusko:
+ return biomaj_status_ko(options, config)
+
+ if options.update:
+ return biomaj_bank_update(options, config)
+
+ if options.freeze:
+ return biomaj_freeze(options, config)
+
+ if options.unfreeze:
+ return biomaj_unfreeze(options, config)
+
+ if options.remove or options.removeall:
+ return biomaj_remove(options, config)
+
+ if options.removepending:
+ return biomaj_remove_pending(options, config)
+
+ if options.unpublish:
+ return biomaj_unpublish(options, config)
+
+ if options.publish:
+ return biomaj_publish(options, config)
+
+ if options.updatestatus:
+ return biomaj_update_status(options, config)
+
+ if options.updatecancel:
+ return biomaj_update_cancel(options, config)
+
+ if options.aboutme:
+ return biomaj_user_info(options, config)
diff --git a/biomaj_daemon/daemon/wsgi.py b/biomaj_daemon/daemon/wsgi.py
new file mode 100644
index 0000000..141af4f
--- /dev/null
+++ b/biomaj_daemon/daemon/wsgi.py
@@ -0,0 +1,4 @@
+from biomaj_daemon.daemon.biomaj_daemon_web import app
+
+if __name__ == "__main__":
+ app.run()
diff --git a/config.yml b/config.yml
new file mode 100644
index 0000000..c9804e3
--- /dev/null
+++ b/config.yml
@@ -0,0 +1,45 @@
+biomaj:
+ config: '/home/osallou/Development/NOSAVE/genouest/global.properties'
+
+redis:
+ host: '127.0.0.1'
+ #host: '131.254.17.40'
+ port: 6379
+ db: 0
+ prefix: 'biomajdaemon'
+
+
+consul:
+ host: null
+ # Unique agent identifier name among biomaj downloaders
+ id: 'biomaj_daemon_agent'
+
+web:
+ debug: true
+ port: 5002
+ local_endpoint: 'http://127.0.0.1:5000'
+
+tls:
+ key: null
+ cert: null
+
+log_config:
+ 'version': 1
+ 'formatters':
+ 'generic':
+ 'format': '%(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s'
+ 'handlers':
+ 'console':
+ 'class': 'logging.StreamHandler'
+ 'formatter': 'generic'
+ 'level': 'DEBUG'
+ 'loggers':
+ 'root':
+ 'level': 'INFO'
+ 'handlers':
+ - 'console'
+ 'biomaj':
+ 'level': 'DEBUG'
+ 'handlers':
+ - 'console'
+ 'disable_existing_loggers': False
diff --git a/gunicorn_conf.py b/gunicorn_conf.py
new file mode 100644
index 0000000..b5a65b0
--- /dev/null
+++ b/gunicorn_conf.py
@@ -0,0 +1,3 @@
+def worker_exit(server, worker):
+ from prometheus_client import multiprocess
+ multiprocess.mark_process_dead(worker.pid)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..301d3ed
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,9 @@
+biomaj
+biomaj-user
+biomaj-zipkin
+redis
+PyYAML
+flask
+python-consul
+prometheus_client>=0.0.18
+requests
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..3c6e79c
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_wheel]
+universal=1
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..1bb64fc
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,63 @@
+try:
+ from setuptools import setup, find_packages
+except ImportError:
+ from distutils.core import setup
+
+from distutils.command.install import install
+import os
+
+
+here = os.path.abspath(os.path.dirname(__file__))
+with open(os.path.join(here, 'README.md')) as f:
+ README = f.read()
+with open(os.path.join(here, 'CHANGES.txt')) as f:
+ CHANGES = f.read()
+
+
+config = {
+ 'description': 'BioMAJ daemon service',
+ 'long_description': README + '\n\n' + CHANGES,
+ 'author': 'Olivier Sallou',
+ 'url': 'http://biomaj.genouest.org',
+ 'download_url': 'http://biomaj.genouest.org',
+ 'author_email': 'olivier.sallou at irisa.fr',
+ 'version': '3.0.12',
+ 'classifiers': [
+ # How mature is this project? Common values are
+ # 3 - Alpha
+ # 4 - Beta
+ # 5 - Production/Stable
+ 'Development Status :: 5 - Production/Stable',
+ 'Environment :: Console',
+ 'Natural Language :: English',
+ 'Operating System :: POSIX :: Linux',
+ # Indicate who your project is intended for
+ 'Intended Audience :: Science/Research',
+ 'Topic :: Scientific/Engineering :: Bio-Informatics',
+ # Pick your license as you wish (should match "license" above)
+ 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',
+ # Specify the Python versions you support here. In particular, ensure
+ # that you indicate whether you support Python 2, Python 3 or both.
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.4'
+ ],
+ 'install_requires': [
+ 'biomaj',
+ 'biomaj-user',
+ 'biomaj-zipkin',
+ 'redis',
+ 'PyYAML',
+ 'flask',
+ 'python-consul',
+ 'prometheus_client>=0.0.18',
+ 'requests'
+ ],
+ 'tests_require': ['nose', 'mock'],
+ 'test_suite': 'nose.collector',
+ 'packages': find_packages(),
+ 'include_package_data': True,
+ 'scripts': ['bin/biomaj_daemon_consumer.py'],
+ 'name': 'biomaj_daemon'
+}
+
+setup(**config)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/biomaj3-daemon.git
More information about the debian-med-commit
mailing list