[med-svn] [python-bz2file] 01/02: Imported Upstream version 0.98
Michael Crusoe
misterc-guest at moszumanska.debian.org
Thu May 28 02:48:30 UTC 2015
This is an automated email from the git hooks/post-receive script.
misterc-guest pushed a commit to branch master
in repository python-bz2file.
commit f8610188fc2c01b03192544bd5740b5c5e5ab5b4
Author: Michael R. Crusoe <mcrusoe at msu.edu>
Date: Wed May 27 21:43:01 2015 -0400
Imported Upstream version 0.98
---
PKG-INFO | 85 +++++++
README.rst | 59 +++++
bz2file.py | 499 ++++++++++++++++++++++++++++++++++++++
setup.py | 34 +++
test_bz2file.py | 732 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 1409 insertions(+)
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..90b8d79
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,85 @@
+Metadata-Version: 1.1
+Name: bz2file
+Version: 0.98
+Summary: Read and write bzip2-compressed files.
+Home-page: https://github.com/nvawda/bz2file
+Author: Nadeem Vawda
+Author-email: nadeem.vawda at gmail.com
+License: Apache License, Version 2.0
+Description: Bz2file is a Python library for reading and writing bzip2-compressed files.
+
+ It contains a drop-in replacement for the file interface in the standard
+ library's ``bz2`` module, including features from the latest development
+ version of CPython that are not available in older releases.
+
+ Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as
+ PyPy 2.0.
+
+
+ Features
+ --------
+
+ - Supports multi-stream files.
+
+ - Can read from or write to any file-like object.
+
+ - Can open files in either text or binary mode.
+
+ - Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``,
+ ``readable()``, ``writable()``, ``seekable()``.
+
+
+ Installation
+ ------------
+
+ To install bz2file, run: ::
+
+ $ pip install bz2file
+
+
+ Documentation
+ -------------
+
+ The ``open()`` function and ``BZ2File`` class in this module provide the same
+ features and interface as the ones in the standard library's ``bz2`` module in
+ the current development version of CPython, `documented here
+ <http://docs.python.org/dev/library/bz2.html>`_.
+
+
+ Version History
+ ---------------
+
+ 0.98: 19 January 2014
+
+ - Added support for the 'x' family of modes.
+ - Ignore non-bz2 data at the end of a file, rather than raising an exception.
+ - Tests now pass on PyPy.
+
+ 0.95: 08 October 2012
+
+ - Added the ``open()`` function.
+ - Improved performance when reading in small chunks.
+ - Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file
+ object, pass it as the first argument (``filename``).
+
+ 0.9: 04 February 2012
+
+ - Initial release.
+
+Platform: UNKNOWN
+Classifier: Development Status :: 4 - Beta
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Operating System :: OS Independent
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.0
+Classifier: Programming Language :: Python :: 3.1
+Classifier: Programming Language :: Python :: 3.2
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Topic :: System :: Archiving :: Compression
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..2e848ef
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,59 @@
+Bz2file is a Python library for reading and writing bzip2-compressed files.
+
+It contains a drop-in replacement for the file interface in the standard
+library's ``bz2`` module, including features from the latest development
+version of CPython that are not available in older releases.
+
+Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as
+PyPy 2.0.
+
+
+Features
+--------
+
+- Supports multi-stream files.
+
+- Can read from or write to any file-like object.
+
+- Can open files in either text or binary mode.
+
+- Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``,
+ ``readable()``, ``writable()``, ``seekable()``.
+
+
+Installation
+------------
+
+To install bz2file, run: ::
+
+ $ pip install bz2file
+
+
+Documentation
+-------------
+
+The ``open()`` function and ``BZ2File`` class in this module provide the same
+features and interface as the ones in the standard library's ``bz2`` module in
+the current development version of CPython, `documented here
+<http://docs.python.org/dev/library/bz2.html>`_.
+
+
+Version History
+---------------
+
+0.98: 19 January 2014
+
+- Added support for the 'x' family of modes.
+- Ignore non-bz2 data at the end of a file, rather than raising an exception.
+- Tests now pass on PyPy.
+
+0.95: 08 October 2012
+
+- Added the ``open()`` function.
+- Improved performance when reading in small chunks.
+- Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file
+ object, pass it as the first argument (``filename``).
+
+0.9: 04 February 2012
+
+- Initial release.
diff --git a/bz2file.py b/bz2file.py
new file mode 100644
index 0000000..1d91195
--- /dev/null
+++ b/bz2file.py
@@ -0,0 +1,499 @@
+"""Module for reading and writing bzip2-compressed files.
+
+This module contains a backport of Python 3.4's bz2.open() function and
+BZ2File class, adapted to work with earlier versions of Python.
+"""
+
+__all__ = ["BZ2File", "open"]
+
+__author__ = "Nadeem Vawda <nadeem.vawda at gmail.com>"
+
+import io
+import sys
+import warnings
+
+try:
+ from threading import RLock
+except ImportError:
+ from dummy_threading import RLock
+
+from bz2 import BZ2Compressor, BZ2Decompressor
+
+
+_MODE_CLOSED = 0
+_MODE_READ = 1
+_MODE_READ_EOF = 2
+_MODE_WRITE = 3
+
+_BUFFER_SIZE = 8192
+
+_STR_TYPES = (str, unicode) if (str is bytes) else (str, bytes)
+
+# The 'x' mode for open() was introduced in Python 3.3.
+_HAS_OPEN_X_MODE = sys.version_info[:2] >= (3, 3)
+
+_builtin_open = open
+
+
+class BZ2File(io.BufferedIOBase):
+
+ """A file object providing transparent bzip2 (de)compression.
+
+ A BZ2File can act as a wrapper for an existing file object, or refer
+ directly to a named file on disk.
+
+ Note that BZ2File provides a *binary* file interface - data read is
+ returned as bytes, and data to be written should be given as bytes.
+ """
+
+ def __init__(self, filename, mode="r", buffering=None, compresslevel=9):
+ """Open a bzip2-compressed file.
+
+ If filename is a str, bytes or unicode object, it gives the name
+ of the file to be opened. Otherwise, it should be a file object,
+ which will be used to read or write the compressed data.
+
+ mode can be 'r' for reading (default), 'w' for (over)writing,
+ 'x' for creating exclusively, or 'a' for appending. These can
+ equivalently be given as 'rb', 'wb', 'xb', and 'ab'.
+
+ buffering is ignored. Its use is deprecated.
+
+ If mode is 'w', 'x' or 'a', compresslevel can be a number between 1
+ and 9 specifying the level of compression: 1 produces the least
+ compression, and 9 (default) produces the most compression.
+
+ If mode is 'r', the input file may be the concatenation of
+ multiple compressed streams.
+ """
+ # This lock must be recursive, so that BufferedIOBase's
+ # readline(), readlines() and writelines() don't deadlock.
+ self._lock = RLock()
+ self._fp = None
+ self._closefp = False
+ self._mode = _MODE_CLOSED
+ self._pos = 0
+ self._size = -1
+
+ if buffering is not None:
+ warnings.warn("Use of 'buffering' argument is deprecated",
+ DeprecationWarning)
+
+ if not (1 <= compresslevel <= 9):
+ raise ValueError("compresslevel must be between 1 and 9")
+
+ if mode in ("", "r", "rb"):
+ mode = "rb"
+ mode_code = _MODE_READ
+ self._decompressor = BZ2Decompressor()
+ self._buffer = b""
+ self._buffer_offset = 0
+ elif mode in ("w", "wb"):
+ mode = "wb"
+ mode_code = _MODE_WRITE
+ self._compressor = BZ2Compressor(compresslevel)
+ elif mode in ("x", "xb") and _HAS_OPEN_X_MODE:
+ mode = "xb"
+ mode_code = _MODE_WRITE
+ self._compressor = BZ2Compressor(compresslevel)
+ elif mode in ("a", "ab"):
+ mode = "ab"
+ mode_code = _MODE_WRITE
+ self._compressor = BZ2Compressor(compresslevel)
+ else:
+ raise ValueError("Invalid mode: %r" % (mode,))
+
+ if isinstance(filename, _STR_TYPES):
+ self._fp = _builtin_open(filename, mode)
+ self._closefp = True
+ self._mode = mode_code
+ elif hasattr(filename, "read") or hasattr(filename, "write"):
+ self._fp = filename
+ self._mode = mode_code
+ else:
+ raise TypeError("filename must be a %s or %s object, or a file" %
+ (_STR_TYPES[0].__name__, _STR_TYPES[1].__name__))
+
+ def close(self):
+ """Flush and close the file.
+
+ May be called more than once without error. Once the file is
+ closed, any other operation on it will raise a ValueError.
+ """
+ with self._lock:
+ if self._mode == _MODE_CLOSED:
+ return
+ try:
+ if self._mode in (_MODE_READ, _MODE_READ_EOF):
+ self._decompressor = None
+ elif self._mode == _MODE_WRITE:
+ self._fp.write(self._compressor.flush())
+ self._compressor = None
+ finally:
+ try:
+ if self._closefp:
+ self._fp.close()
+ finally:
+ self._fp = None
+ self._closefp = False
+ self._mode = _MODE_CLOSED
+ self._buffer = b""
+ self._buffer_offset = 0
+
+ @property
+ def closed(self):
+ """True if this file is closed."""
+ return self._mode == _MODE_CLOSED
+
+ def fileno(self):
+ """Return the file descriptor for the underlying file."""
+ self._check_not_closed()
+ return self._fp.fileno()
+
+ def seekable(self):
+ """Return whether the file supports seeking."""
+ return self.readable() and (self._fp.seekable()
+ if hasattr(self._fp, "seekable")
+ else hasattr(self._fp, "seek"))
+
+ def readable(self):
+ """Return whether the file was opened for reading."""
+ self._check_not_closed()
+ return self._mode in (_MODE_READ, _MODE_READ_EOF)
+
+ def writable(self):
+ """Return whether the file was opened for writing."""
+ self._check_not_closed()
+ return self._mode == _MODE_WRITE
+
+ # Mode-checking helper functions.
+
+ def _check_not_closed(self):
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+
+ def _check_can_read(self):
+ if self._mode not in (_MODE_READ, _MODE_READ_EOF):
+ self._check_not_closed()
+ raise io.UnsupportedOperation("File not open for reading")
+
+ def _check_can_write(self):
+ if self._mode != _MODE_WRITE:
+ self._check_not_closed()
+ raise io.UnsupportedOperation("File not open for writing")
+
+ def _check_can_seek(self):
+ if self._mode not in (_MODE_READ, _MODE_READ_EOF):
+ self._check_not_closed()
+ raise io.UnsupportedOperation("Seeking is only supported "
+ "on files open for reading")
+ if hasattr(self._fp, "seekable") and not self._fp.seekable():
+ raise io.UnsupportedOperation("The underlying file object "
+ "does not support seeking")
+
+ # Fill the readahead buffer if it is empty. Returns False on EOF.
+ def _fill_buffer(self):
+ if self._mode == _MODE_READ_EOF:
+ return False
+ # Depending on the input data, our call to the decompressor may not
+ # return any data. In this case, try again after reading another block.
+ while self._buffer_offset == len(self._buffer):
+ rawblock = (self._decompressor.unused_data or
+ self._fp.read(_BUFFER_SIZE))
+
+ if not rawblock:
+ try:
+ self._decompressor.decompress(b"")
+ except EOFError:
+ # End-of-stream marker and end of file. We're good.
+ self._mode = _MODE_READ_EOF
+ self._size = self._pos
+ return False
+ else:
+ # Problem - we were expecting more compressed data.
+ raise EOFError("Compressed file ended before the "
+ "end-of-stream marker was reached")
+
+ try:
+ self._buffer = self._decompressor.decompress(rawblock)
+ except EOFError:
+ # Continue to next stream.
+ self._decompressor = BZ2Decompressor()
+ try:
+ self._buffer = self._decompressor.decompress(rawblock)
+ except IOError:
+ # Trailing data isn't a valid bzip2 stream. We're done here.
+ self._mode = _MODE_READ_EOF
+ self._size = self._pos
+ return False
+ self._buffer_offset = 0
+ return True
+
+ # Read data until EOF.
+ # If return_data is false, consume the data without returning it.
+ def _read_all(self, return_data=True):
+ # The loop assumes that _buffer_offset is 0. Ensure that this is true.
+ self._buffer = self._buffer[self._buffer_offset:]
+ self._buffer_offset = 0
+
+ blocks = []
+ while self._fill_buffer():
+ if return_data:
+ blocks.append(self._buffer)
+ self._pos += len(self._buffer)
+ self._buffer = b""
+ if return_data:
+ return b"".join(blocks)
+
+ # Read a block of up to n bytes.
+ # If return_data is false, consume the data without returning it.
+ def _read_block(self, n, return_data=True):
+ # If we have enough data buffered, return immediately.
+ end = self._buffer_offset + n
+ if end <= len(self._buffer):
+ data = self._buffer[self._buffer_offset : end]
+ self._buffer_offset = end
+ self._pos += len(data)
+ return data if return_data else None
+
+ # The loop assumes that _buffer_offset is 0. Ensure that this is true.
+ self._buffer = self._buffer[self._buffer_offset:]
+ self._buffer_offset = 0
+
+ blocks = []
+ while n > 0 and self._fill_buffer():
+ if n < len(self._buffer):
+ data = self._buffer[:n]
+ self._buffer_offset = n
+ else:
+ data = self._buffer
+ self._buffer = b""
+ if return_data:
+ blocks.append(data)
+ self._pos += len(data)
+ n -= len(data)
+ if return_data:
+ return b"".join(blocks)
+
+ def peek(self, n=0):
+ """Return buffered data without advancing the file position.
+
+ Always returns at least one byte of data, unless at EOF.
+ The exact number of bytes returned is unspecified.
+ """
+ with self._lock:
+ self._check_can_read()
+ if not self._fill_buffer():
+ return b""
+ return self._buffer[self._buffer_offset:]
+
+ def read(self, size=-1):
+ """Read up to size uncompressed bytes from the file.
+
+ If size is negative or omitted, read until EOF is reached.
+ Returns b'' if the file is already at EOF.
+ """
+ if size is None:
+ raise TypeError()
+ with self._lock:
+ self._check_can_read()
+ if size == 0:
+ return b""
+ elif size < 0:
+ return self._read_all()
+ else:
+ return self._read_block(size)
+
+ def read1(self, size=-1):
+ """Read up to size uncompressed bytes, while trying to avoid
+ making multiple reads from the underlying stream.
+
+ Returns b'' if the file is at EOF.
+ """
+ # Usually, read1() calls _fp.read() at most once. However, sometimes
+ # this does not give enough data for the decompressor to make progress.
+ # In this case we make multiple reads, to avoid returning b"".
+ with self._lock:
+ self._check_can_read()
+ if (size == 0 or
+ # Only call _fill_buffer() if the buffer is actually empty.
+ # This gives a significant speedup if *size* is small.
+ (self._buffer_offset == len(self._buffer) and not self._fill_buffer())):
+ return b""
+ if size > 0:
+ data = self._buffer[self._buffer_offset :
+ self._buffer_offset + size]
+ self._buffer_offset += len(data)
+ else:
+ data = self._buffer[self._buffer_offset:]
+ self._buffer = b""
+ self._buffer_offset = 0
+ self._pos += len(data)
+ return data
+
+ def readinto(self, b):
+ """Read up to len(b) bytes into b.
+
+ Returns the number of bytes read (0 for EOF).
+ """
+ with self._lock:
+ return io.BufferedIOBase.readinto(self, b)
+
+ def readline(self, size=-1):
+ """Read a line of uncompressed bytes from the file.
+
+ The terminating newline (if present) is retained. If size is
+ non-negative, no more than size bytes will be read (in which
+ case the line may be incomplete). Returns b'' if already at EOF.
+ """
+ if not isinstance(size, int):
+ if not hasattr(size, "__index__"):
+ raise TypeError("Integer argument expected")
+ size = size.__index__()
+ with self._lock:
+ self._check_can_read()
+ # Shortcut for the common case - the whole line is in the buffer.
+ if size < 0:
+ end = self._buffer.find(b"\n", self._buffer_offset) + 1
+ if end > 0:
+ line = self._buffer[self._buffer_offset : end]
+ self._buffer_offset = end
+ self._pos += len(line)
+ return line
+ return io.BufferedIOBase.readline(self, size)
+
+ def readlines(self, size=-1):
+ """Read a list of lines of uncompressed bytes from the file.
+
+ size can be specified to control the number of lines read: no
+ further lines will be read once the total size of the lines read
+ so far equals or exceeds size.
+ """
+ if not isinstance(size, int):
+ if not hasattr(size, "__index__"):
+ raise TypeError("Integer argument expected")
+ size = size.__index__()
+ with self._lock:
+ return io.BufferedIOBase.readlines(self, size)
+
+ def write(self, data):
+ """Write a byte string to the file.
+
+ Returns the number of uncompressed bytes written, which is
+ always len(data). Note that due to buffering, the file on disk
+ may not reflect the data written until close() is called.
+ """
+ with self._lock:
+ self._check_can_write()
+ compressed = self._compressor.compress(data)
+ self._fp.write(compressed)
+ self._pos += len(data)
+ return len(data)
+
+ def writelines(self, seq):
+ """Write a sequence of byte strings to the file.
+
+ Returns the number of uncompressed bytes written.
+ seq can be any iterable yielding byte strings.
+
+ Line separators are not added between the written byte strings.
+ """
+ with self._lock:
+ return io.BufferedIOBase.writelines(self, seq)
+
+ # Rewind the file to the beginning of the data stream.
+ def _rewind(self):
+ self._fp.seek(0, 0)
+ self._mode = _MODE_READ
+ self._pos = 0
+ self._decompressor = BZ2Decompressor()
+ self._buffer = b""
+ self._buffer_offset = 0
+
+ def seek(self, offset, whence=0):
+ """Change the file position.
+
+ The new position is specified by offset, relative to the
+ position indicated by whence. Values for whence are:
+
+ 0: start of stream (default); offset must not be negative
+ 1: current stream position
+ 2: end of stream; offset must not be positive
+
+ Returns the new file position.
+
+ Note that seeking is emulated, so depending on the parameters,
+ this operation may be extremely slow.
+ """
+ with self._lock:
+ self._check_can_seek()
+
+ # Recalculate offset as an absolute file position.
+ if whence == 0:
+ pass
+ elif whence == 1:
+ offset = self._pos + offset
+ elif whence == 2:
+ # Seeking relative to EOF - we need to know the file's size.
+ if self._size < 0:
+ self._read_all(return_data=False)
+ offset = self._size + offset
+ else:
+ raise ValueError("Invalid value for whence: %s" % (whence,))
+
+ # Make it so that offset is the number of bytes to skip forward.
+ if offset < self._pos:
+ self._rewind()
+ else:
+ offset -= self._pos
+
+ # Read and discard data until we reach the desired position.
+ self._read_block(offset, return_data=False)
+
+ return self._pos
+
+ def tell(self):
+ """Return the current file position."""
+ with self._lock:
+ self._check_not_closed()
+ return self._pos
+
+
+def open(filename, mode="rb", compresslevel=9,
+ encoding=None, errors=None, newline=None):
+ """Open a bzip2-compressed file in binary or text mode.
+
+ The filename argument can be an actual filename (a str, bytes or unicode
+ object), or an existing file object to read from or write to.
+
+ The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or
+ "ab" for binary mode, or "rt", "wt", "xt" or "at" for text mode.
+ The default mode is "rb", and the default compresslevel is 9.
+
+ For binary mode, this function is equivalent to the BZ2File
+ constructor: BZ2File(filename, mode, compresslevel). In this case,
+ the encoding, errors and newline arguments must not be provided.
+
+ For text mode, a BZ2File object is created, and wrapped in an
+ io.TextIOWrapper instance with the specified encoding, error
+ handling behavior, and line ending(s).
+
+ """
+ if "t" in mode:
+ if "b" in mode:
+ raise ValueError("Invalid mode: %r" % (mode,))
+ else:
+ if encoding is not None:
+ raise ValueError("Argument 'encoding' not supported in binary mode")
+ if errors is not None:
+ raise ValueError("Argument 'errors' not supported in binary mode")
+ if newline is not None:
+ raise ValueError("Argument 'newline' not supported in binary mode")
+
+ bz_mode = mode.replace("t", "")
+ binary_file = BZ2File(filename, bz_mode, compresslevel=compresslevel)
+
+ if "t" in mode:
+ return io.TextIOWrapper(binary_file, encoding, errors, newline)
+ else:
+ return binary_file
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..d31f317
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,34 @@
+from distutils.core import setup
+
+with open("README.rst") as f:
+ readme = f.read()
+
+setup(
+ name="bz2file",
+ version="0.98",
+ description="Read and write bzip2-compressed files.",
+ long_description=readme,
+ author="Nadeem Vawda",
+ author_email="nadeem.vawda at gmail.com",
+ url="https://github.com/nvawda/bz2file",
+ py_modules=["bz2file"],
+ license="Apache License, Version 2.0",
+ classifiers=[
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.0",
+ "Programming Language :: Python :: 3.1",
+ "Programming Language :: Python :: 3.2",
+ "Programming Language :: Python :: 3.3",
+ "Programming Language :: Python :: 3.4",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ "Topic :: System :: Archiving :: Compression",
+ ],
+)
diff --git a/test_bz2file.py b/test_bz2file.py
new file mode 100644
index 0000000..7ad58ff
--- /dev/null
+++ b/test_bz2file.py
@@ -0,0 +1,732 @@
+import unittest
+import bz2
+import bz2file
+from bz2file import BZ2File
+from io import BytesIO
+import os
+import platform
+
+try:
+ import threading
+except ImportError:
+ threading = None
+
+try:
+ from test import support
+except ImportError:
+ from test import test_support as support
+
+
+class BaseTest(unittest.TestCase):
+ "Base for other testcases."
+
+ TEXT_LINES = [
+ b'root:x:0:0:root:/root:/bin/bash\n',
+ b'bin:x:1:1:bin:/bin:\n',
+ b'daemon:x:2:2:daemon:/sbin:\n',
+ b'adm:x:3:4:adm:/var/adm:\n',
+ b'lp:x:4:7:lp:/var/spool/lpd:\n',
+ b'sync:x:5:0:sync:/sbin:/bin/sync\n',
+ b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n',
+ b'halt:x:7:0:halt:/sbin:/sbin/halt\n',
+ b'mail:x:8:12:mail:/var/spool/mail:\n',
+ b'news:x:9:13:news:/var/spool/news:\n',
+ b'uucp:x:10:14:uucp:/var/spool/uucp:\n',
+ b'operator:x:11:0:operator:/root:\n',
+ b'games:x:12:100:games:/usr/games:\n',
+ b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n',
+ b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n',
+ b'nobody:x:65534:65534:Nobody:/home:\n',
+ b'postfix:x:100:101:postfix:/var/spool/postfix:\n',
+ b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n',
+ b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n',
+ b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n',
+ b'www:x:103:104::/var/www:/bin/false\n',
+ ]
+ TEXT = b''.join(TEXT_LINES)
+ DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7 [...]
+ BAD_DATA = b'this is not a valid bzip2 file'
+
+ def setUp(self):
+ self.filename = support.TESTFN
+
+ def tearDown(self):
+ if os.path.isfile(self.filename):
+ os.unlink(self.filename)
+
+ # In some of the tests, we need to verify the contents of multi-stream
+ # files, but bz2.decompress() could not handle this case prior to 3.3.
+ def decompress(self, data):
+ results = []
+ while True:
+ decomp = bz2.BZ2Decompressor()
+ results.append(decomp.decompress(data))
+ try:
+ decomp.decompress(b"")
+ except EOFError:
+ if not decomp.unused_data:
+ return b"".join(results)
+ data = decomp.unused_data
+ else:
+ raise ValueError("Compressed data ended before the "
+ "end-of-stream marker was reached")
+
+
+class BZ2FileTest(BaseTest):
+ "Test the BZ2File class."
+
+ def createTempFile(self, streams=1, suffix=b""):
+ with open(self.filename, "wb") as f:
+ f.write(self.DATA * streams)
+ f.write(suffix)
+
+ def testBadArgs(self):
+ self.assertRaises(TypeError, BZ2File, 123.456)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
+
+ def testRead(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(), self.TEXT)
+
+ def testReadBadFile(self):
+ self.createTempFile(streams=0, suffix=self.BAD_DATA)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(IOError, bz2f.read)
+
+ def testReadMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(), self.TEXT * 5)
+
+ def testReadMonkeyMultiStream(self):
+ # Test BZ2File.read() on a multi-stream archive where a stream
+ # boundary coincides with the end of the raw read buffer.
+ buffer_size = bz2file._BUFFER_SIZE
+ bz2file._BUFFER_SIZE = len(self.DATA)
+ try:
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(), self.TEXT * 5)
+ finally:
+ bz2file._BUFFER_SIZE = buffer_size
+
+ def testReadTrailingJunk(self):
+ self.createTempFile(suffix=self.BAD_DATA)
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(bz2f.read(), self.TEXT)
+
+ def testReadMultiStreamTrailingJunk(self):
+ self.createTempFile(streams=5, suffix=self.BAD_DATA)
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(bz2f.read(), self.TEXT * 5)
+
+ def testRead0(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(0), b"")
+
+ def testReadChunk10(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ text = b''
+ while True:
+ str = bz2f.read(10)
+ if not str:
+ break
+ text += str
+ self.assertEqual(text, self.TEXT)
+
+ def testReadChunk10MultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ text = b''
+ while True:
+ str = bz2f.read(10)
+ if not str:
+ break
+ text += str
+ self.assertEqual(text, self.TEXT * 5)
+
+ def testRead100(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(bz2f.read(100), self.TEXT[:100])
+
+ def testPeek(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ pdata = bz2f.peek()
+ self.assertNotEqual(len(pdata), 0)
+ self.assertTrue(self.TEXT.startswith(pdata))
+ self.assertEqual(bz2f.read(), self.TEXT)
+
+ def testReadInto(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ n = 128
+ b = bytearray(n)
+ self.assertEqual(bz2f.readinto(b), n)
+ self.assertEqual(b, self.TEXT[:n])
+ n = len(self.TEXT) - n
+ b = bytearray(len(self.TEXT))
+ self.assertEqual(bz2f.readinto(b), n)
+ self.assertEqual(b[:n], self.TEXT[-n:])
+
+ def testReadLine(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readline, None)
+ for line in self.TEXT_LINES:
+ self.assertEqual(bz2f.readline(), line)
+
+ def testReadLineMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readline, None)
+ for line in self.TEXT_LINES * 5:
+ self.assertEqual(bz2f.readline(), line)
+
+ def testReadLines(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readlines, None)
+ self.assertEqual(bz2f.readlines(), self.TEXT_LINES)
+
+ def testReadLinesMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readlines, None)
+ self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5)
+
+ def testIterator(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(list(iter(bz2f)), self.TEXT_LINES)
+
+ def testIteratorMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5)
+
+ def testClosedIteratorDeadlock(self):
+ # Issue #3309: Iteration on a closed BZ2File should release the lock.
+ self.createTempFile()
+ bz2f = BZ2File(self.filename)
+ bz2f.close()
+ self.assertRaises(ValueError, next, bz2f)
+ # This call will deadlock if the above call failed to release the lock.
+ self.assertRaises(ValueError, bz2f.readlines)
+
+ def testWrite(self):
+ with BZ2File(self.filename, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.write)
+ bz2f.write(self.TEXT)
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+ def testWriteChunks10(self):
+ with BZ2File(self.filename, "w") as bz2f:
+ n = 0
+ while True:
+ str = self.TEXT[n*10:(n+1)*10]
+ if not str:
+ break
+ bz2f.write(str)
+ n += 1
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+ def testWriteNonDefaultCompressLevel(self):
+ expected = bz2.compress(self.TEXT, compresslevel=5)
+ with BZ2File(self.filename, "w", compresslevel=5) as bz2f:
+ bz2f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ self.assertEqual(f.read(), expected)
+
+ def testWriteLines(self):
+ with BZ2File(self.filename, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.writelines)
+ bz2f.writelines(self.TEXT_LINES)
+ # Issue #1535500: Calling writelines() on a closed BZ2File
+ # should raise an exception.
+ self.assertRaises(ValueError, bz2f.writelines, ["a"])
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+ def testWriteMethodsOnReadOnlyFile(self):
+ with BZ2File(self.filename, "w") as bz2f:
+ bz2f.write(b"abc")
+
+ with BZ2File(self.filename, "r") as bz2f:
+ self.assertRaises(IOError, bz2f.write, b"a")
+ self.assertRaises(IOError, bz2f.writelines, [b"a"])
+
+ def testAppend(self):
+ with BZ2File(self.filename, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.write)
+ bz2f.write(self.TEXT)
+ with BZ2File(self.filename, "a") as bz2f:
+ self.assertRaises(TypeError, bz2f.write)
+ bz2f.write(self.TEXT)
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT * 2)
+
+ def testSeekForward(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.seek)
+ bz2f.seek(150)
+ self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+ def testSeekForwardAcrossStreams(self):
+ self.createTempFile(streams=2)
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.seek)
+ bz2f.seek(len(self.TEXT) + 150)
+ self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+ def testSeekBackwards(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.read(500)
+ bz2f.seek(-150, 1)
+ self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
+ def testSeekBackwardsAcrossStreams(self):
+ self.createTempFile(streams=2)
+ with BZ2File(self.filename) as bz2f:
+ readto = len(self.TEXT) + 100
+ while readto > 0:
+ readto -= len(bz2f.read(readto))
+ bz2f.seek(-150, 1)
+ self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT)
+
+ def testSeekBackwardsFromEnd(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-150, 2)
+ self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
+
+ def testSeekBackwardsFromEndAcrossStreams(self):
+ self.createTempFile(streams=2)
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-1000, 2)
+ self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:])
+
+ def testSeekPostEnd(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT))
+ self.assertEqual(bz2f.read(), b"")
+
+ def testSeekPostEndMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
+ self.assertEqual(bz2f.read(), b"")
+
+ def testSeekPostEndTwice(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT))
+ self.assertEqual(bz2f.read(), b"")
+
+ def testSeekPostEndTwiceMultiStream(self):
+ self.createTempFile(streams=5)
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
+ self.assertEqual(bz2f.read(), b"")
+
+ def testSeekPreStart(self):
+ self.createTempFile()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-150)
+ self.assertEqual(bz2f.tell(), 0)
+ self.assertEqual(bz2f.read(), self.TEXT)
+
+ def testSeekPreStartMultiStream(self):
+ self.createTempFile(streams=2)
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-150)
+ self.assertEqual(bz2f.tell(), 0)
+ self.assertEqual(bz2f.read(), self.TEXT * 2)
+
+ def testFileno(self):
+ self.createTempFile()
+ with open(self.filename, 'rb') as rawf:
+ bz2f = BZ2File(rawf)
+ try:
+ self.assertEqual(bz2f.fileno(), rawf.fileno())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.fileno)
+
+ def testSeekable(self):
+ bz2f = BZ2File(BytesIO(self.DATA))
+ try:
+ self.assertTrue(bz2f.seekable())
+ bz2f.read()
+ self.assertTrue(bz2f.seekable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.seekable)
+
+ bz2f = BZ2File(BytesIO(), "w")
+ try:
+ self.assertFalse(bz2f.seekable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.seekable)
+
+ src = BytesIO(self.DATA)
+ src.seekable = lambda: False
+ bz2f = BZ2File(src)
+ try:
+ self.assertFalse(bz2f.seekable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.seekable)
+
+ def testReadable(self):
+ bz2f = BZ2File(BytesIO(self.DATA))
+ try:
+ self.assertTrue(bz2f.readable())
+ bz2f.read()
+ self.assertTrue(bz2f.readable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.readable)
+
+ bz2f = BZ2File(BytesIO(), "w")
+ try:
+ self.assertFalse(bz2f.readable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.readable)
+
+ def testWritable(self):
+ bz2f = BZ2File(BytesIO(self.DATA))
+ try:
+ self.assertFalse(bz2f.writable())
+ bz2f.read()
+ self.assertFalse(bz2f.writable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.writable)
+
+ bz2f = BZ2File(BytesIO(), "w")
+ try:
+ self.assertTrue(bz2f.writable())
+ finally:
+ bz2f.close()
+ self.assertRaises(ValueError, bz2f.writable)
+
+ def testOpenDel(self):
+ if platform.python_implementation() != "CPython":
+ self.skipTest("Test depends on CPython refcounting semantics")
+ self.createTempFile()
+ for i in range(10000):
+ o = BZ2File(self.filename)
+ del o
+
+ def testOpenNonexistent(self):
+ self.assertRaises(IOError, BZ2File, "/non/existent")
+
+ def testReadlinesNoNewline(self):
+ # Issue #1191043: readlines() fails on a file containing no newline.
+ data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
+ with open(self.filename, "wb") as f:
+ f.write(data)
+ with BZ2File(self.filename) as bz2f:
+ lines = bz2f.readlines()
+ self.assertEqual(lines, [b'Test'])
+ with BZ2File(self.filename) as bz2f:
+ xlines = list(bz2f.readlines())
+ self.assertEqual(xlines, [b'Test'])
+
+ def testContextProtocol(self):
+ f = None
+ with BZ2File(self.filename, "wb") as f:
+ f.write(b"xxx")
+ f = BZ2File(self.filename, "rb")
+ f.close()
+ try:
+ with f:
+ pass
+ except ValueError:
+ pass
+ else:
+ self.fail("__enter__ on a closed file didn't raise an exception")
+ try:
+ with BZ2File(self.filename, "wb") as f:
+ 1/0
+ except ZeroDivisionError:
+ pass
+ else:
+ self.fail("1/0 didn't raise an exception")
+
+ def testThreading(self):
+ if not threading:
+ return
+ # Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
+ data = b"1" * 2**20
+ nthreads = 10
+ with BZ2File(self.filename, 'wb') as f:
+ def comp():
+ for i in range(5):
+ f.write(data)
+ threads = [threading.Thread(target=comp) for i in range(nthreads)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ def testWithoutThreading(self):
+ if not hasattr(support, "import_fresh_module"):
+ return
+ module = support.import_fresh_module("bz2file", blocked=("threading",))
+ with module.BZ2File(self.filename, "wb") as f:
+ f.write(b"abc")
+ with module.BZ2File(self.filename, "rb") as f:
+ self.assertEqual(f.read(), b"abc")
+
+ def testMixedIterationAndReads(self):
+ self.createTempFile()
+ linelen = len(self.TEXT_LINES[0])
+ halflen = linelen // 2
+ with BZ2File(self.filename) as bz2f:
+ bz2f.read(halflen)
+ self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
+ self.assertEqual(bz2f.read(), self.TEXT[linelen:])
+ with BZ2File(self.filename) as bz2f:
+ bz2f.readline()
+ self.assertEqual(next(bz2f), self.TEXT_LINES[1])
+ self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
+ with BZ2File(self.filename) as bz2f:
+ bz2f.readlines()
+ self.assertRaises(StopIteration, next, bz2f)
+ self.assertEqual(bz2f.readlines(), [])
+
+ def testMultiStreamOrdering(self):
+ # Test the ordering of streams when reading a multi-stream archive.
+ data1 = b"foo" * 1000
+ data2 = b"bar" * 1000
+ with BZ2File(self.filename, "w") as bz2f:
+ bz2f.write(data1)
+ with BZ2File(self.filename, "a") as bz2f:
+ bz2f.write(data2)
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(bz2f.read(), data1 + data2)
+
+ def testOpenBytesFilename(self):
+ str_filename = self.filename
+ try:
+ bytes_filename = str_filename.encode("ascii")
+ except UnicodeEncodeError:
+ self.skipTest("Temporary file name needs to be ASCII")
+ with BZ2File(bytes_filename, "wb") as f:
+ f.write(self.DATA)
+ with BZ2File(bytes_filename, "rb") as f:
+ self.assertEqual(f.read(), self.DATA)
+ # Sanity check that we are actually operating on the right file.
+ with BZ2File(str_filename, "rb") as f:
+ self.assertEqual(f.read(), self.DATA)
+
+
+ # Tests for a BZ2File wrapping another file object:
+
+ def testReadBytesIO(self):
+ with BytesIO(self.DATA) as bio:
+ with BZ2File(bio) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(), self.TEXT)
+ self.assertFalse(bio.closed)
+
+ def testPeekBytesIO(self):
+ with BytesIO(self.DATA) as bio:
+ with BZ2File(bio) as bz2f:
+ pdata = bz2f.peek()
+ self.assertNotEqual(len(pdata), 0)
+ self.assertTrue(self.TEXT.startswith(pdata))
+ self.assertEqual(bz2f.read(), self.TEXT)
+
+ def testWriteBytesIO(self):
+ with BytesIO() as bio:
+ with BZ2File(bio, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.write)
+ bz2f.write(self.TEXT)
+ self.assertEqual(self.decompress(bio.getvalue()), self.TEXT)
+ self.assertFalse(bio.closed)
+
+ def testSeekForwardBytesIO(self):
+ with BytesIO(self.DATA) as bio:
+ with BZ2File(bio) as bz2f:
+ self.assertRaises(TypeError, bz2f.seek)
+ bz2f.seek(150)
+ self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+ def testSeekBackwardsBytesIO(self):
+ with BytesIO(self.DATA) as bio:
+ with BZ2File(bio) as bz2f:
+ bz2f.read(500)
+ bz2f.seek(-150, 1)
+ self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
+ def test_read_truncated(self):
+ # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
+ truncated = self.DATA[:-10]
+ with BZ2File(BytesIO(truncated)) as f:
+ self.assertRaises(EOFError, f.read)
+ with BZ2File(BytesIO(truncated)) as f:
+ self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
+ self.assertRaises(EOFError, f.read, 1)
+ # Incomplete 4-byte file header, and block header of at least 146 bits.
+ for i in range(22):
+ with BZ2File(BytesIO(truncated[:i])) as f:
+ self.assertRaises(EOFError, f.read, 1)
+
+
+class OpenTest(BaseTest):
+ "Test the open function."
+
+ def open(self, *args, **kwargs):
+ return bz2file.open(*args, **kwargs)
+
+ def test_binary_modes(self):
+ modes = ["wb", "xb"] if bz2file._HAS_OPEN_X_MODE else ["wb"]
+ for mode in modes:
+ if mode == "xb":
+ support.unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT)
+ with self.open(self.filename, "rb") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ with self.open(self.filename, "ab") as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT * 2)
+
+ def test_implicit_binary_modes(self):
+ # Test implicit binary modes (no "b" or "t" in mode string).
+ modes = ["w", "x"] if bz2file._HAS_OPEN_X_MODE else ["w"]
+ for mode in modes:
+ if mode == "x":
+ support.unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT)
+ with self.open(self.filename, "r") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ with self.open(self.filename, "a") as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT * 2)
+
+ def test_text_modes(self):
+ text = self.TEXT.decode("ascii")
+ text_native_eol = text.replace("\n", os.linesep)
+ modes = ["wt", "xt"] if bz2file._HAS_OPEN_X_MODE else ["wt"]
+ for mode in modes:
+ if mode == "xt":
+ support.unlink(self.filename)
+ if not bz2file._HAS_OPEN_X_MODE:
+ continue
+ with self.open(self.filename, mode) as f:
+ f.write(text)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read()).decode("ascii")
+ self.assertEqual(file_data, text_native_eol)
+ with self.open(self.filename, "rt") as f:
+ self.assertEqual(f.read(), text)
+ with self.open(self.filename, "at") as f:
+ f.write(text)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read()).decode("ascii")
+ self.assertEqual(file_data, text_native_eol * 2)
+
+ def test_x_mode(self):
+ if not bz2file._HAS_OPEN_X_MODE:
+ return
+ for mode in ("x", "xb", "xt"):
+ support.unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ pass
+ with self.assertRaises(FileExistsError):
+ with self.open(self.filename, mode) as f:
+ pass
+
+ def test_fileobj(self):
+ with self.open(BytesIO(self.DATA), "r") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ with self.open(BytesIO(self.DATA), "rb") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ text = self.TEXT.decode("ascii")
+ with self.open(BytesIO(self.DATA), "rt") as f:
+ self.assertEqual(f.read(), text)
+
+ def test_bad_params(self):
+ # Test invalid parameter combinations.
+ self.assertRaises(ValueError,
+ self.open, self.filename, "wbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "xbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", encoding="utf-8")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", errors="ignore")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", newline="\n")
+
+ def test_encoding(self):
+ # Test non-default encoding.
+ text = self.TEXT.decode("ascii")
+ text_native_eol = text.replace("\n", os.linesep)
+ with self.open(self.filename, "wt", encoding="utf-16-le") as f:
+ f.write(text)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read()).decode("utf-16-le")
+ self.assertEqual(file_data, text_native_eol)
+ with self.open(self.filename, "rt", encoding="utf-16-le") as f:
+ self.assertEqual(f.read(), text)
+
+ def test_encoding_error_handler(self):
+ # Test with non-default encoding error handler.
+ with self.open(self.filename, "wb") as f:
+ f.write(b"foo\xffbar")
+ with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+ as f:
+ self.assertEqual(f.read(), "foobar")
+
+ def test_newline(self):
+ # Test with explicit newline (universal newline mode disabled).
+ text = self.TEXT.decode("ascii")
+ with self.open(self.filename, "wt", newline="\n") as f:
+ f.write(text)
+ with self.open(self.filename, "rt", newline="\r") as f:
+ self.assertEqual(f.readlines(), [text])
+
+
+if __name__ == '__main__':
+ unittest.main()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-bz2file.git
More information about the debian-med-commit
mailing list