[Python-modules-commits] [pyclamd] 02/04: Import pyclamd_0.3.17.orig.tar.gz

Scott Kitterman kitterman at moszumanska.debian.org
Thu Nov 17 21:32:27 UTC 2016


This is an automated email from the git hooks/post-receive script.

kitterman pushed a commit to branch master
in repository pyclamd.

commit 520d22dc7c9944090d90caf37de15e6f231eccac
Author: Scott Kitterman <scott at kitterman.com>
Date:   Thu Nov 17 16:30:09 2016 -0500

    Import pyclamd_0.3.17.orig.tar.gz
---
 PKG-INFO                     |   2 +-
 pyClamd.egg-info/PKG-INFO    |   2 +-
 pyClamd.egg-info/SOURCES.txt |   3 +-
 pyclamd/pyclamd.py           |  48 +++++----
 pyclamd/test_pyclamd.py      | 231 +++++++++++++++++++++++++++++++++++++++++++
 setup.cfg                    |   4 +-
 6 files changed, 264 insertions(+), 26 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 11321ed..3bc6132 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: pyClamd
-Version: 0.3.16
+Version: 0.3.17
 Summary: pyClamd is a python interface to Clamd (Clamav daemon).
 Home-page: http://xael.org/pages/pyclamd-en.html
 Author: Alexandre Norman
diff --git a/pyClamd.egg-info/PKG-INFO b/pyClamd.egg-info/PKG-INFO
index 11321ed..3bc6132 100644
--- a/pyClamd.egg-info/PKG-INFO
+++ b/pyClamd.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: pyClamd
-Version: 0.3.16
+Version: 0.3.17
 Summary: pyClamd is a python interface to Clamd (Clamav daemon).
 Home-page: http://xael.org/pages/pyclamd-en.html
 Author: Alexandre Norman
diff --git a/pyClamd.egg-info/SOURCES.txt b/pyClamd.egg-info/SOURCES.txt
index d9647e3..fd7509a 100644
--- a/pyClamd.egg-info/SOURCES.txt
+++ b/pyClamd.egg-info/SOURCES.txt
@@ -5,4 +5,5 @@ pyClamd.egg-info/SOURCES.txt
 pyClamd.egg-info/dependency_links.txt
 pyClamd.egg-info/top_level.txt
 pyclamd/__init__.py
-pyclamd/pyclamd.py
\ No newline at end of file
+pyclamd/pyclamd.py
+pyclamd/test_pyclamd.py
\ No newline at end of file
diff --git a/pyclamd/pyclamd.py b/pyclamd/pyclamd.py
index 0a9ca9c..9fea4e8 100644
--- a/pyclamd/pyclamd.py
+++ b/pyclamd/pyclamd.py
@@ -129,7 +129,7 @@ True
 >>> os.remove('/tmp/EICAR-éèô请收藏我们的网址')
 """
 
-__version__ = "0.3.16"
+__version__ = "0.3.17"
 
 
 # $Source$
@@ -140,6 +140,7 @@ import sys
 import socket
 import struct
 import base64
+import time
 
 ############################################################################
 
@@ -470,7 +471,7 @@ class _ClamdGeneric(object):
         else:
             # Python3
             assert isinstance(buffer_to_test, bytes) or isinstance(buffer_to_test, bytearray), 'Wrong type fom [buffer_to_test], should be bytes or bytearray [was {0}]'.format(type(buffer_to_test))
-            
+
         try:
             self._init_socket()
             self._send_command('INSTREAM')
@@ -478,10 +479,12 @@ class _ClamdGeneric(object):
         except socket.error:
             raise ConnectionError('Unable to scan stream')
 
-        max_chunk_size = 1024 # MUST be < StreamMaxLength in /etc/clamav/clamd.conf or /etc/clamd.conf
+        # MUST be < StreamMaxLength in /etc/clamav/clamd.conf
+        # or /etc/clamd.conf
+        max_chunk_size = 1024
 
         for n in range(1 + int(len(buffer_to_test)/max_chunk_size)):
-            chunk = buffer_to_test[n*max_chunk_size:(n+1)*max_chunk_size]            
+            chunk = buffer_to_test[n*max_chunk_size:(n+1)*max_chunk_size]
             size = struct.pack('!L', len(chunk))
             try:
                 self.clamd_socket.send(size)
@@ -492,9 +495,8 @@ class _ClamdGeneric(object):
             # Terminating stream
             self.clamd_socket.send(struct.pack('!L', 0))
 
-
         result='...'
-        dr={}
+        dr = {}
         while result:
             try:
                 result = self._recv_response()
@@ -502,15 +504,15 @@ class _ClamdGeneric(object):
                 raise ConnectionError('Unable to scan stream')
 
             if len(result) > 0:
-                
+
                 if result == 'INSTREAM size limit exceeded. ERROR':
                     raise BufferTooLongError(result)
 
                 filename, reason, status = self._parse_response(result)
-               
+
                 if status == 'ERROR':
                     dr[filename] = ('ERROR', '{0}'.format(reason))
-                    
+
                 elif status == 'FOUND':
                     dr[filename] = ('FOUND', '{0}'.format(reason))
 
@@ -536,13 +538,25 @@ class _ClamdGeneric(object):
         self.clamd_socket.send(cmd)
         return
 
-    
-
     def _recv_response(self):
         """
         receive response from clamd and strip all whitespace characters
         """
-        data = self.clamd_socket.recv(4096)
+        # If we connect too quickly
+        # sometimes we get a connexion error
+        # so we retry
+        failed_count = 5
+        while True:
+            try:
+                data = self.clamd_socket.recv(4096)
+            except socket.error:
+                time.sleep(0.01)
+                failed_count -= 1
+                if failed_count == 0:
+                    raise
+            else:
+                break
+
         try:
             response = bytes.decode(data).strip()
         except UnicodeDecodeError:
@@ -559,15 +573,7 @@ class _ClamdGeneric(object):
         response = ''
         c = '...'
         while c != '':
-            try:
-                data = self.clamd_socket.recv(4096)
-                try:
-                    c = bytes.decode(data).strip()
-                except UnicodeDecodeError:
-                    response = data.strip()
-            except socket.error:
-                break
-                
+            c = self._recv_response()
             response += '{0}\n'.format(c)
         return response
 
diff --git a/pyclamd/test_pyclamd.py b/pyclamd/test_pyclamd.py
new file mode 100644
index 0000000..9c7435a
--- /dev/null
+++ b/pyclamd/test_pyclamd.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import pyclamd
+import sys
+import datetime
+import os
+
+from nose.tools import assert_equals
+from nose.tools import raises
+from nose import with_setup
+
+from multiprocessing import Value
+
+"""
+test_pyclamd.py - tests cases for pyclamd
+
+Source code : https://bitbucket.org/xael/pyclamd
+
+Author :
+
+* Alexandre Norman - norman at xael.org
+
+ 
+Licence : GPL v3 or any later version
+
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+
+##########################################################################################
+
+"""
+This plugin provides ``--pdb`` and ``--pdb-failures`` options. The ``--pdb``
+option will drop the test runner into pdb when it encounters an error. To
+drop into pdb on failure, use ``--pdb-failures``.
+"""
+
+import pdb
+from nose.plugins.base import Plugin
+
+class Pdb(Plugin):
+    """
+    Provides --pdb and --pdb-failures options that cause the test runner to
+    drop into pdb if it encounters an error or failure, respectively.
+    """
+    enabled_for_errors = False
+    enabled_for_failures = False
+    score = 5 # run last, among builtins
+    
+    def options(self, parser, env):
+        """Register commandline options.
+        """
+        parser.add_option(
+            "--pdb", action="store_true", dest="debugBoth",
+            default=env.get('NOSE_PDB', False),
+            help="Drop into debugger on failures or errors")
+        parser.add_option(
+            "--pdb-failures", action="store_true",
+            dest="debugFailures",
+            default=env.get('NOSE_PDB_FAILURES', False),
+            help="Drop into debugger on failures")
+        parser.add_option(
+            "--pdb-errors", action="store_true",
+            dest="debugErrors",
+            default=env.get('NOSE_PDB_ERRORS', False),
+            help="Drop into debugger on errors")
+
+    def configure(self, options, conf):
+        """Configure which kinds of exceptions trigger plugin.
+        """
+        self.conf = conf
+        self.enabled_for_errors = options.debugErrors or options.debugBoth
+        self.enabled_for_failures = options.debugFailures or options.debugBoth
+        self.enabled = self.enabled_for_failures or self.enabled_for_errors
+
+    def addError(self, test, err):
+        """Enter pdb if configured to debug errors.
+        """
+        if not self.enabled_for_errors:
+            return
+        self.debug(err)
+
+    def addFailure(self, test, err):
+        """Enter pdb if configured to debug failures.
+        """
+        if not self.enabled_for_failures:
+            return
+        self.debug(err)
+
+    def debug(self, err):
+        import sys # FIXME why is this import here?
+        ec, ev, tb = err
+        stdout = sys.stdout
+        sys.stdout = sys.__stdout__
+        try:
+            pdb.post_mortem(tb)
+        finally:
+            sys.stdout = stdout
+
+##########################################################################################
+
+def setup_module():
+    global cd
+    cd = pyclamd.ClamdAgnostic()
+    return
+
+def teardown_module():
+    os.remove('/tmp/EICAR')
+    os.remove('/tmp/NO_EICAR')
+    os.remove('/tmp/EICAR-éèô请收藏我们的网址')
+    return
+
+def test_ping():
+    """
+    Tests pinging clamd
+    """
+    assert(cd.ping())
+
+
+def test_version():
+    """
+    Tests version
+    """
+    assert_equals(cd.version().split()[0], 'ClamAV')
+
+def test_reloading_base():
+    """
+    Reloads clamd database
+    """
+    assert_equals(cd.reload(), 'RELOADING')
+
+
+def test_stats():
+    """
+    Checks stats
+    """
+    assert_equals(cd.stats().split()[0], 'POOLS:')
+
+
+def test_eicar():
+    """
+    Tests eicar infected file
+    """
+    void = open('/tmp/EICAR','wb').write(cd.EICAR())
+    assert_equals(cd.scan_file('/tmp/EICAR')['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+def test_no_eicar():
+    """
+    Tests standard non infected file
+    """
+    void = open('/tmp/NO_EICAR','w').write('no virus in this file')
+    assert(cd.scan_file('/tmp/NO_EICAR') is None)
+
+def test_stream():
+    """
+    Tests eicar infected stream    
+    """
+    assert_equals(cd.scan_stream(cd.EICAR())['stream'], ('FOUND', 'Eicar-Test-Signature'))
+
+
+def test_directory_scanning():
+    """
+    Tests directory scanning with eicar infected file
+    """
+    directory = cd.contscan_file('/tmp/')
+    assert_equals(directory['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+def test_multiscan_file():
+    """
+    Tests multiscan file scanning with eicar infected file
+    """
+    directory = cd.multiscan_file('/tmp/')
+    assert_equals(directory['/tmp/EICAR'], ('FOUND', 'Eicar-Test-Signature'))
+
+    
+def test_unicode_scanning():
+    """
+    Tests encoding with non latin characters 
+    (Chinese ideograms taken from random site, don't know what it mean, sorry)
+    """
+    void = open('/tmp/EICAR-éèô请收藏我们的网址','wb').write(cd.EICAR())
+    r = cd.scan_file('/tmp/EICAR-éèô请收藏我们的网址')
+    assert_equals(list(r.keys())[0], '/tmp/EICAR-éèô请收藏我们的网址')
+    assert_equals(r['/tmp/EICAR-éèô请收藏我们的网址'], ('FOUND', 'Eicar-Test-Signature'))
+
+
+
+def test_scan_stream_unicode_test_eicar_in_pdf():
+    """
+    Tests stream scan with eicar in pdf file. Not detected by design of ClamAv. 
+    """
+    file_data = open('./probleme_data.pdf', 'rb').read()
+    v = cd.scan_stream(file_data)
+    assert_equals(v, None)
+
+
+def test_scan_stream_unicode_test_clean():
+    """
+    Tests stream scan with clean pdf file 
+    """
+    file_data = open('./probleme_data_clean.pdf', 'rb').read()
+    v = cd.scan_stream(file_data)
+    assert_equals(v, None)
+    return
+
+
+def test_scan_file_unicode_test_eicar_in_pdf():
+    """
+    Tests stream scan with clean pdf file 
+    """
+    v = cd.scan_file('/home/xael/ESPACE_KM/python/pyclamd/probleme_data.pdf')
+    #.assertEqual(v, {u'stream': ('FOUND', 'Eicar-Test-Signature')})
+    assert_equals(v, None)
+    return
+
+
+##########################################################################################
diff --git a/setup.cfg b/setup.cfg
index 861a9f5..72f9d44 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
 [egg_info]
-tag_build = 
-tag_date = 0
 tag_svn_revision = 0
+tag_date = 0
+tag_build = 
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/pyclamd.git



More information about the Python-modules-commits mailing list