[Python-modules-commits] [pyclamd] 02/04: Import pyclamd_0.3.17.orig.tar.gz
Scott Kitterman
kitterman at moszumanska.debian.org
Thu Nov 17 21:32:27 UTC 2016
This is an automated email from the git hooks/post-receive script.
kitterman pushed a commit to branch master
in repository pyclamd.
commit 520d22dc7c9944090d90caf37de15e6f231eccac
Author: Scott Kitterman <scott at kitterman.com>
Date: Thu Nov 17 16:30:09 2016 -0500
Import pyclamd_0.3.17.orig.tar.gz
---
PKG-INFO | 2 +-
pyClamd.egg-info/PKG-INFO | 2 +-
pyClamd.egg-info/SOURCES.txt | 3 +-
pyclamd/pyclamd.py | 48 +++++----
pyclamd/test_pyclamd.py | 231 +++++++++++++++++++++++++++++++++++++++++++
setup.cfg | 4 +-
6 files changed, 264 insertions(+), 26 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 11321ed..3bc6132 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: pyClamd
-Version: 0.3.16
+Version: 0.3.17
Summary: pyClamd is a python interface to Clamd (Clamav daemon).
Home-page: http://xael.org/pages/pyclamd-en.html
Author: Alexandre Norman
diff --git a/pyClamd.egg-info/PKG-INFO b/pyClamd.egg-info/PKG-INFO
index 11321ed..3bc6132 100644
--- a/pyClamd.egg-info/PKG-INFO
+++ b/pyClamd.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: pyClamd
-Version: 0.3.16
+Version: 0.3.17
Summary: pyClamd is a python interface to Clamd (Clamav daemon).
Home-page: http://xael.org/pages/pyclamd-en.html
Author: Alexandre Norman
diff --git a/pyClamd.egg-info/SOURCES.txt b/pyClamd.egg-info/SOURCES.txt
index d9647e3..fd7509a 100644
--- a/pyClamd.egg-info/SOURCES.txt
+++ b/pyClamd.egg-info/SOURCES.txt
@@ -5,4 +5,5 @@ pyClamd.egg-info/SOURCES.txt
pyClamd.egg-info/dependency_links.txt
pyClamd.egg-info/top_level.txt
pyclamd/__init__.py
-pyclamd/pyclamd.py
\ No newline at end of file
+pyclamd/pyclamd.py
+pyclamd/test_pyclamd.py
\ No newline at end of file
diff --git a/pyclamd/pyclamd.py b/pyclamd/pyclamd.py
index 0a9ca9c..9fea4e8 100644
--- a/pyclamd/pyclamd.py
+++ b/pyclamd/pyclamd.py
@@ -129,7 +129,7 @@ True
>>> os.remove('/tmp/EICAR-éèô请收藏我们的网址')
"""
-__version__ = "0.3.16"
+__version__ = "0.3.17"
# $Source$
@@ -140,6 +140,7 @@ import sys
import socket
import struct
import base64
+import time
############################################################################
@@ -470,7 +471,7 @@ class _ClamdGeneric(object):
else:
# Python3
assert isinstance(buffer_to_test, bytes) or isinstance(buffer_to_test, bytearray), 'Wrong type fom [buffer_to_test], should be bytes or bytearray [was {0}]'.format(type(buffer_to_test))
-
+
try:
self._init_socket()
self._send_command('INSTREAM')
@@ -478,10 +479,12 @@ class _ClamdGeneric(object):
except socket.error:
raise ConnectionError('Unable to scan stream')
- max_chunk_size = 1024 # MUST be < StreamMaxLength in /etc/clamav/clamd.conf or /etc/clamd.conf
+ # MUST be < StreamMaxLength in /etc/clamav/clamd.conf
+ # or /etc/clamd.conf
+ max_chunk_size = 1024
for n in range(1 + int(len(buffer_to_test)/max_chunk_size)):
- chunk = buffer_to_test[n*max_chunk_size:(n+1)*max_chunk_size]
+ chunk = buffer_to_test[n*max_chunk_size:(n+1)*max_chunk_size]
size = struct.pack('!L', len(chunk))
try:
self.clamd_socket.send(size)
@@ -492,9 +495,8 @@ class _ClamdGeneric(object):
# Terminating stream
self.clamd_socket.send(struct.pack('!L', 0))
-
result='...'
- dr={}
+ dr = {}
while result:
try:
result = self._recv_response()
@@ -502,15 +504,15 @@ class _ClamdGeneric(object):
raise ConnectionError('Unable to scan stream')
if len(result) > 0:
-
+
if result == 'INSTREAM size limit exceeded. ERROR':
raise BufferTooLongError(result)
filename, reason, status = self._parse_response(result)
-
+
if status == 'ERROR':
dr[filename] = ('ERROR', '{0}'.format(reason))
-
+
elif status == 'FOUND':
dr[filename] = ('FOUND', '{0}'.format(reason))
@@ -536,13 +538,25 @@ class _ClamdGeneric(object):
self.clamd_socket.send(cmd)
return
-
-
def _recv_response(self):
"""
receive response from clamd and strip all whitespace characters
"""
- data = self.clamd_socket.recv(4096)
+ # If we connect too quickly
+ # sometimes we get a connexion error
+ # so we retry
+ failed_count = 5
+ while True:
+ try:
+ data = self.clamd_socket.recv(4096)
+ except socket.error:
+ time.sleep(0.01)
+ failed_count -= 1
+ if failed_count == 0:
+ raise
+ else:
+ break
+
try:
response = bytes.decode(data).strip()
except UnicodeDecodeError:
@@ -559,15 +573,7 @@ class _ClamdGeneric(object):
response = ''
c = '...'
while c != '':
- try:
- data = self.clamd_socket.recv(4096)
- try:
- c = bytes.decode(data).strip()
- except UnicodeDecodeError:
- response = data.strip()
- except socket.error:
- break
-
+ c = self._recv_response()
response += '{0}\n'.format(c)
return response
diff --git a/pyclamd/test_pyclamd.py b/pyclamd/test_pyclamd.py
new file mode 100644
index 0000000..9c7435a
--- /dev/null
+++ b/pyclamd/test_pyclamd.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import pyclamd
+import sys
+import datetime
+import os
+
+from nose.tools import assert_equals
+from nose.tools import raises
+from nose import with_setup
+
+from multiprocessing import Value
+
+"""
+test_pyclamd.py - tests cases for pyclamd
+
+Source code : https://bitbucket.org/xael/pyclamd
+
+Author :
+
+* Alexandre Norman - norman at xael.org
+
+
+Licence : GPL v3 or any later version
+
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+
+##########################################################################################
+
+"""
+This plugin provides ``--pdb`` and ``--pdb-failures`` options. The ``--pdb``
+option will drop the test runner into pdb when it encounters an error. To
+drop into pdb on failure, use ``--pdb-failures``.
+"""
+
+import pdb
+from nose.plugins.base import Plugin
+
+class Pdb(Plugin):
+ """
+ Provides --pdb and --pdb-failures options that cause the test runner to
+ drop into pdb if it encounters an error or failure, respectively.
+ """
+ enabled_for_errors = False
+ enabled_for_failures = False
+ score = 5 # run last, among builtins
+
+ def options(self, parser, env):
+ """Register commandline options.
+ """
+ parser.add_option(
+ "--pdb", action="store_true", dest="debugBoth",
+ default=env.get('NOSE_PDB', False),
+ help="Drop into debugger on failures or errors")
+ parser.add_option(
+ "--pdb-failures", action="store_true",
+ dest="debugFailures",
+ default=env.get('NOSE_PDB_FAILURES', False),
+ help="Drop into debugger on failures")
+ parser.add_option(
+ "--pdb-errors", action="store_true",
+ dest="debugErrors",
+ default=env.get('NOSE_PDB_ERRORS', False),
+ help="Drop into debugger on errors")
+
+ def configure(self, options, conf):
+ """Configure which kinds of exceptions trigger plugin.
+ """
+ self.conf = conf
+ self.enabled_for_errors = options.debugErrors or options.debugBoth
+ self.enabled_for_failures = options.debugFailures or options.debugBoth
+ self.enabled = self.enabled_for_failures or self.enabled_for_errors
+
+ def addError(self, test, err):
+ """Enter pdb if configured to debug errors.
+ """
+ if not self.enabled_for_errors:
+ return
+ self.debug(err)
+
+ def addFailure(self, test, err):
+ """Enter pdb if configured to debug failures.
+ """
+ if not self.enabled_for_failures:
+ return
+ self.debug(err)
+
+ def debug(self, err):
+ import sys # FIXME why is this import here?
+ ec, ev, tb = err
+ stdout = sys.stdout
+ sys.stdout = sys.__stdout__
+ try:
+ pdb.post_mortem(tb)
+ finally:
+ sys.stdout = stdout
+
+##########################################################################################
+
+def setup_module():
+ global cd
+ cd = pyclamd.ClamdAgnostic()
+ return
+
+def teardown_module():
+ os.remove('/tmp/EICAR')
+ os.remove('/tmp/NO_EICAR')
+ os.remove('/tmp/EICAR-éèô请收藏我们的网址')
+ return
+
+def test_ping():
+ """
+ Tests pinging clamd
+ """
+ assert(cd.ping())
+
+
+def test_version():
+ """
+ Tests version
+ """
+ assert_equals(cd.version().split()[0], 'ClamAV')
+
+def test_reloading_base():
+ """
+ Reloads clamd database
+ """
+ assert_equals(cd.reload(), 'RELOADING')
+
+
+def test_stats():
+ """
+ Checks stats
+ """
+ assert_equals(cd.stats().split()[0], 'POOLS:')
+
+
+def test_eicar():
+ """
+ Tests eicar infected file
+ """
+ void = open('/tmp/EICAR','wb').write(cd.EICAR())
+ assert_equals(cd.scan_file('/tmp/EICAR')['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+def test_no_eicar():
+ """
+ Tests standard non infected file
+ """
+ void = open('/tmp/NO_EICAR','w').write('no virus in this file')
+ assert(cd.scan_file('/tmp/NO_EICAR') is None)
+
+def test_stream():
+ """
+ Tests eicar infected stream
+ """
+ assert_equals(cd.scan_stream(cd.EICAR())['stream'], ('FOUND', 'Eicar-Test-Signature'))
+
+
+def test_directory_scanning():
+ """
+ Tests directory scanning with eicar infected file
+ """
+ directory = cd.contscan_file('/tmp/')
+ assert_equals(directory['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+def test_multiscan_file():
+ """
+ Tests multiscan file scanning with eicar infected file
+ """
+ directory = cd.multiscan_file('/tmp/')
+ assert_equals(directory['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+
+def test_unicode_scanning():
+ """
+ Tests encoding with non latin characters
+ (Chinese ideograms taken from random site, don't know what it mean, sorry)
+ """
+ void = open('/tmp/EICAR-éèô请收藏我们的网址','wb').write(cd.EICAR())
+ r = cd.scan_file('/tmp/EICAR-éèô请收藏我们的网址')
+ assert_equals(list(r.keys())[0], '/tmp/EICAR-éèô请收藏我们的网址')
+ assert_equals(r['/tmp/EICAR-éèô请收藏我们的网址'], ('FOUND', 'Eicar-Test-Signature'))
+
+
+
+def test_scan_stream_unicode_test_eicar_in_pdf():
+ """
+ Tests stream scan with eicar in pdf file. Not detected by design of ClamAv.
+ """
+ file_data = open('./probleme_data.pdf', 'rb').read()
+ v = cd.scan_stream(file_data)
+ assert_equals(v, None)
+
+
+def test_scan_stream_unicode_test_clean():
+ """
+ Tests stream scan with clean pdf file
+ """
+ file_data = open('./probleme_data_clean.pdf', 'rb').read()
+ v = cd.scan_stream(file_data)
+ assert_equals(v, None)
+ return
+
+
+def test_scan_file_unicode_test_eicar_in_pdf():
+ """
+ Tests stream scan with clean pdf file
+ """
+ v = cd.scan_file('/home/xael/ESPACE_KM/python/pyclamd/probleme_data.pdf')
+ #.assertEqual(v, {u'stream': ('FOUND', 'Eicar-Test-Signature')})
+ assert_equals(v, None)
+ return
+
+
+##########################################################################################
diff --git a/setup.cfg b/setup.cfg
index 861a9f5..72f9d44 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
[egg_info]
-tag_build =
-tag_date = 0
tag_svn_revision = 0
+tag_date = 0
+tag_build =
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/pyclamd.git
More information about the Python-modules-commits
mailing list