[Python-modules-commits] [python-parquet] 01/08: New upstream version 1.1

ChangZhuo Chen czchen at moszumanska.debian.org
Tue Sep 20 03:33:57 UTC 2016


This is an automated email from the git hooks/post-receive script.

czchen pushed a commit to branch master
in repository python-parquet.

commit ce128f075b66216c8daff74225f4f04109d1f5c2
Author: ChangZhuo Chen (陳昌倬) <czchen at debian.org>
Date:   Tue Sep 20 10:54:50 2016 +0800

    New upstream version 1.1
---
 MANIFEST.in                           |   1 +
 PKG-INFO                              |  22 ++
 README.rst                            |  83 +++++
 parquet.egg-info/PKG-INFO             |  22 ++
 parquet.egg-info/SOURCES.txt          |  18 ++
 parquet.egg-info/dependency_links.txt |   1 +
 parquet.egg-info/entry_points.txt     |   3 +
 parquet.egg-info/requires.txt         |   5 +
 parquet.egg-info/top_level.txt        |   1 +
 parquet/__init__.py                   | 480 ++++++++++++++++++++++++++++
 parquet/__main__.py                   |  61 ++++
 parquet/bitstring.py                  |  20 ++
 parquet/encoding.py                   | 228 ++++++++++++++
 parquet/parquet.thrift                | 573 ++++++++++++++++++++++++++++++++++
 parquet/schema.py                     |  48 +++
 parquet/thrift_filetransport.py       |  17 +
 setup.cfg                             |   5 +
 setup.py                              |  44 +++
 test/test_encoding.py                 | 135 ++++++++
 test/test_read_support.py             | 199 ++++++++++++
 20 files changed, 1966 insertions(+)

diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..89e8b96
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+recursive-include parquet *.thrift
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..836d1cf
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,22 @@
+Metadata-Version: 1.1
+Name: parquet
+Version: 1.1
+Summary: Python support for Parquet file format
+Home-page: https://github.com/jcrobak/parquet-python
+Author: Joe Crobak
+Author-email: joecrow at gmail.com
+License: Apache License 2.0
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: Implementation :: CPython
+Classifier: Programming Language :: Python :: Implementation :: PyPy
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..72acb4e
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,83 @@
+parquet-python
+==============
+
+parquet-python is a pure-python implementation (currently with only
+read-support) of the `parquet
+format <https://github.com/Parquet/parquet-format>`_. It comes with a
+script for reading parquet files and outputting the data to stdout as
+JSON or TSV (without the overhead of JVM startup). Performance has not
+yet been optimized, but it's useful for debugging and quick viewing of
+data in files.
+
+Not all parts of the parquet-format have been implemented yet or tested
+e.g. nested data—see Todos below for a full list. With that said,
+parquet-python is capable of reading all the data files from the
+`parquet-compatability <https://github.com/Parquet/parquet-compatibility>`_
+project.
+
+requirements
+============
+
+parquet-python has been tested on python 2.7, 3.4, and 3.5. It depends
+on ``thrift`` (0.9) and ``python-snappy`` (for snappy compressed files).
+
+getting started
+===============
+
+parquet-python is available via PyPi and can be installed using
+`pip install parquet`. The package includes the `parquet`
+command for reading python files, e.g. `parquet test.parquet`.
+See `parquet --help` for full usage.
+
+Example
+-------
+
+parquet-python currently has two programatic interfaces with similar
+functionality to Python's csv reader. First, it supports a DictReader
+which returns a dictionary per row. Second, it has a reader which
+returns a list of values for each row. Both function require a file-like
+object and support an optional ``columns`` field to only read the
+specified columns.
+
+.. code:: python
+
+
+    import parquet
+    import json
+
+    ## assuming parquet file with two rows and three columns:
+    ## foo bar baz
+    ## 1   2   3
+    ## 4   5   6
+
+    with open("test.parquet") as fo:
+       # prints:
+       # {"foo": 1, "bar": 2}
+       # {"foo": 4, "bar": 5}
+       for row in parquet.DictReader(fo, columns=['foo', 'bar']):
+           print(json.dumps(row))
+
+
+    with open("test.parquet") as fo:
+       # prints:
+       # 1,2
+       # 4,5
+       for row in parquet.reader(fo, columns=['foo', 'bar]):
+           print(",".join([str(r) for r in row]))
+
+Todos
+=====
+
+-  Support the deprecated bitpacking
+-  Fix handling of repetition-levels and definition-levels
+-  Tests for nested schemas, null data
+-  Support reading of data from HDFS via snakebite and/or webhdfs.
+-  Implement writing
+-  performance evaluation and optimization (i.e. how does it compare to
+   the c++, java implementations)
+
+Contributing
+============
+
+Is done via Pull Requests. Please include tests with your changes and
+follow `pep8 <http://www.python.org/dev/peps/pep-0008/>`_.
diff --git a/parquet.egg-info/PKG-INFO b/parquet.egg-info/PKG-INFO
new file mode 100644
index 0000000..836d1cf
--- /dev/null
+++ b/parquet.egg-info/PKG-INFO
@@ -0,0 +1,22 @@
+Metadata-Version: 1.1
+Name: parquet
+Version: 1.1
+Summary: Python support for Parquet file format
+Home-page: https://github.com/jcrobak/parquet-python
+Author: Joe Crobak
+Author-email: joecrow at gmail.com
+License: Apache License 2.0
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: Implementation :: CPython
+Classifier: Programming Language :: Python :: Implementation :: PyPy
diff --git a/parquet.egg-info/SOURCES.txt b/parquet.egg-info/SOURCES.txt
new file mode 100644
index 0000000..d68cd75
--- /dev/null
+++ b/parquet.egg-info/SOURCES.txt
@@ -0,0 +1,18 @@
+MANIFEST.in
+README.rst
+setup.py
+parquet/__init__.py
+parquet/__main__.py
+parquet/bitstring.py
+parquet/encoding.py
+parquet/parquet.thrift
+parquet/schema.py
+parquet/thrift_filetransport.py
+parquet.egg-info/PKG-INFO
+parquet.egg-info/SOURCES.txt
+parquet.egg-info/dependency_links.txt
+parquet.egg-info/entry_points.txt
+parquet.egg-info/requires.txt
+parquet.egg-info/top_level.txt
+test/test_encoding.py
+test/test_read_support.py
\ No newline at end of file
diff --git a/parquet.egg-info/dependency_links.txt b/parquet.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/parquet.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/parquet.egg-info/entry_points.txt b/parquet.egg-info/entry_points.txt
new file mode 100644
index 0000000..e27f517
--- /dev/null
+++ b/parquet.egg-info/entry_points.txt
@@ -0,0 +1,3 @@
+[console_scripts]
+parquet = parquet.__main__:main
+
diff --git a/parquet.egg-info/requires.txt b/parquet.egg-info/requires.txt
new file mode 100644
index 0000000..aa0f656
--- /dev/null
+++ b/parquet.egg-info/requires.txt
@@ -0,0 +1,5 @@
+python-snappy
+thriftpy>=0.3.6
+
+[:python_version=="2.7"]
+backports.csv
diff --git a/parquet.egg-info/top_level.txt b/parquet.egg-info/top_level.txt
new file mode 100644
index 0000000..c1d924a
--- /dev/null
+++ b/parquet.egg-info/top_level.txt
@@ -0,0 +1 @@
+parquet
diff --git a/parquet/__init__.py b/parquet/__init__.py
new file mode 100644
index 0000000..356be2f
--- /dev/null
+++ b/parquet/__init__.py
@@ -0,0 +1,480 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import gzip
+import io
+import json
+import logging
+import os
+import struct
+import sys
+from collections import defaultdict, OrderedDict
+
+import thriftpy
+from thriftpy.protocol.compact import TCompactProtocolFactory
+
+from .thrift_filetransport import TFileTransport
+from . import encoding
+from . import schema
+
+PY3 = sys.version_info > (3,)
+
+if PY3:
+    import csv
+else:
+    from backports import csv
+
+THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
+parquet_thrift = thriftpy.load(THRIFT_FILE, module_name="parquet_thrift")
+
+
+
+logger = logging.getLogger("parquet")
+
+try:
+    import snappy
+except ImportError:
+    logger.warn(
+        "Couldn't import snappy. Support for snappy compression disabled.")
+
+
+class ParquetFormatException(Exception):
+    pass
+
+
+def _check_header_magic_bytes(fo):
+    "Returns true if the file-like obj has the PAR1 magic bytes at the header"
+    fo.seek(0, 0)
+    magic = fo.read(4)
+    return magic == b'PAR1'
+
+
+def _check_footer_magic_bytes(fo):
+    "Returns true if the file-like obj has the PAR1 magic bytes at the footer"
+    fo.seek(-4, 2)  # seek to four bytes from the end of the file
+    magic = fo.read(4)
+    return magic == b'PAR1'
+
+
+def _get_footer_size(fo):
+    "Readers the footer size in bytes, which is serialized as little endian"
+    fo.seek(-8, 2)
+    tup = struct.unpack("<i", fo.read(4))
+    return tup[0]
+
+
+def _read_footer(fo):
+    """Reads the footer from the given file object, returning a FileMetaData
+    object. This method assumes that the fo references a valid parquet file"""
+    footer_size = _get_footer_size(fo)
+    if logger.isEnabledFor(logging.DEBUG):
+        logger.debug("Footer size in bytes: %s", footer_size)
+    fo.seek(-(8 + footer_size), 2)  # seek to beginning of footer
+    tin = TFileTransport(fo)
+    pin = TCompactProtocolFactory().get_protocol(tin)
+    fmd = parquet_thrift.FileMetaData()
+    fmd.read(pin)
+    return fmd
+
+
+def _read_page_header(fo):
+    """Reads the page_header from the given fo"""
+    tin = TFileTransport(fo)
+    pin = TCompactProtocolFactory().get_protocol(tin)
+    ph = parquet_thrift.PageHeader()
+    ph.read(pin)
+    return ph
+
+
+def read_footer(filename):
+    """Reads and returns the FileMetaData object for the given file."""
+    with open(filename, 'rb') as fo:
+        if not _check_header_magic_bytes(fo) or \
+           not _check_footer_magic_bytes(fo):
+            raise ParquetFormatException("{0} is not a valid parquet file "
+                                         "(missing magic bytes)"
+                                         .format(filename))
+        return _read_footer(fo)
+
+
+def _get_name(type_, value):
+    """Returns the name for the given value of the given type_ unless value is
+    None, in which case it returns empty string"""
+    return type_._VALUES_TO_NAMES[value] if value is not None else "None"
+
+
+def _get_offset(cmd):
+    """Returns the offset into the cmd based upon if it's a dictionary page or
+    a data page"""
+    dict_offset = cmd.dictionary_page_offset
+    data_offset = cmd.data_page_offset
+    if dict_offset is None or data_offset < dict_offset:
+        return data_offset
+    return dict_offset
+
+
+def dump_metadata(filename, show_row_group_metadata, out=sys.stdout):
+    def println(value):
+        out.write(value + "\n")
+    footer = read_footer(filename)
+    println("File Metadata: {0}".format(filename))
+    println("  Version: {0}".format(footer.version))
+    println("  Num Rows: {0}".format(footer.num_rows))
+    println("  k/v metadata: ")
+    if footer.key_value_metadata and len(footer.key_value_metadata) > 0:
+        for kv in footer.key_value_metadata:
+            println("    {0}={1}".format(kv.key, kv.value))
+    else:
+        println("    (none)")
+    println("  schema: ")
+    for se in footer.schema:
+        println("    {name} ({type}): length={type_length}, "
+                "repetition={repetition_type}, "
+                "children={num_children}, "
+                "converted_type={converted_type}".format(
+                    name=se.name,
+                    type=parquet_thrift.Type._VALUES_TO_NAMES[se.type] if se.type else None,
+                    type_length=se.type_length,
+                    repetition_type=_get_name(parquet_thrift.FieldRepetitionType,
+                                              se.repetition_type),
+                    num_children=se.num_children,
+                    converted_type=se.converted_type))
+    if show_row_group_metadata:
+        println("  row groups: ")
+        for rg in footer.row_groups:
+            num_rows = rg.num_rows
+            bytes = rg.total_byte_size
+            println(
+                "  rows={num_rows}, bytes={bytes}".format(num_rows=num_rows,
+                                                          bytes=bytes))
+            println("    chunks:")
+            for cg in rg.columns:
+                cmd = cg.meta_data
+                println("      type={type} file_offset={offset} "
+                        "compression={codec} "
+                        "encodings={encodings} path_in_schema={path_in_schema} "
+                        "num_values={num_values} uncompressed_bytes={raw_bytes} "
+                        "compressed_bytes={compressed_bytes} "
+                        "data_page_offset={data_page_offset} "
+                        "dictionary_page_offset={dictionary_page_offset}".format(
+                            type=_get_name(parquet_thrift.Type, cmd.type),
+                            offset=cg.file_offset,
+                            codec=_get_name(parquet_thrift.CompressionCodec, cmd.codec),
+                            encodings=",".join(
+                                [_get_name(
+                                    parquet_thrift.Encoding, s) for s in cmd.encodings]),
+                            path_in_schema=cmd.path_in_schema,
+                            num_values=cmd.num_values,
+                            raw_bytes=cmd.total_uncompressed_size,
+                            compressed_bytes=cmd.total_compressed_size,
+                            data_page_offset=cmd.data_page_offset,
+                            dictionary_page_offset=cmd.dictionary_page_offset))
+                with open(filename, 'rb') as fo:
+                    offset = _get_offset(cmd)
+                    fo.seek(offset, 0)
+                    values_read = 0
+                    println("      pages: ")
+                    while values_read < num_rows:
+                        ph = _read_page_header(fo)
+                        # seek past current page.
+                        fo.seek(ph.compressed_page_size, 1)
+                        daph = ph.data_page_header
+                        type_ = _get_name(parquet_thrift.PageType, ph.type)
+                        raw_bytes = ph.uncompressed_page_size
+                        num_values = None
+                        if ph.type == parquet_thrift.PageType.DATA_PAGE:
+                            num_values = daph.num_values
+                            values_read += num_values
+                        if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
+                            pass
+                            #num_values = diph.num_values
+
+                        encoding_type = None
+                        def_level_encoding = None
+                        rep_level_encoding = None
+                        if daph:
+                            encoding_type = _get_name(parquet_thrift.Encoding, daph.encoding)
+                            def_level_encoding = _get_name(
+                                parquet_thrift.Encoding, daph.definition_level_encoding)
+                            rep_level_encoding = _get_name(
+                                parquet_thrift.Encoding, daph.repetition_level_encoding)
+
+                        println("        page header: type={type} "
+                                "uncompressed_size={raw_bytes} "
+                                "num_values={num_values} encoding={encoding} "
+                                "def_level_encoding={def_level_encoding} "
+                                "rep_level_encoding={rep_level_encoding}".format(
+                                    type=type_,
+                                    raw_bytes=raw_bytes,
+                                    num_values=num_values,
+                                    encoding=encoding_type,
+                                    def_level_encoding=def_level_encoding,
+                                    rep_level_encoding=rep_level_encoding))
+
+
+def _read_page(fo, page_header, column_metadata):
+    """Internal function to read the data page from the given file-object
+    and convert it to raw, uncompressed bytes (if necessary)."""
+    bytes_from_file = fo.read(page_header.compressed_page_size)
+    codec = column_metadata.codec
+    if codec is not None and codec != parquet_thrift.CompressionCodec.UNCOMPRESSED:
+        if column_metadata.codec == parquet_thrift.CompressionCodec.SNAPPY:
+            raw_bytes = snappy.decompress(bytes_from_file)
+        elif column_metadata.codec == parquet_thrift.CompressionCodec.GZIP:
+            io_obj = io.BytesIO(bytes_from_file)
+            with gzip.GzipFile(fileobj=io_obj, mode='rb') as f:
+                raw_bytes = f.read()
+        else:
+            raise ParquetFormatException(
+                "Unsupported Codec: {0}".format(codec))
+    else:
+        raw_bytes = bytes_from_file
+
+    if logger.isEnabledFor(logging.DEBUG):
+        logger.debug(
+            "Read page with compression type {0}. Bytes {1} -> {2}".format(
+            _get_name(parquet_thrift.CompressionCodec, codec),
+            page_header.compressed_page_size,
+            page_header.uncompressed_page_size))
+    assert len(raw_bytes) == page_header.uncompressed_page_size, \
+        "found {0} raw bytes (expected {1})".format(
+            len(raw_bytes),
+            page_header.uncompressed_page_size)
+    return raw_bytes
+
+
+def _read_data(fo, fo_encoding, value_count, bit_width):
+    """Internal method to read data from the file-object using the given
+    encoding. The data could be definition levels, repetition levels, or
+    actual values.
+    """
+    vals = []
+    if fo_encoding == parquet_thrift.Encoding.RLE:
+        seen = 0
+        while seen < value_count:
+            values = encoding.read_rle_bit_packed_hybrid(fo, bit_width)
+            if values is None:
+                break  # EOF was reached.
+            vals += values
+            seen += len(values)
+    elif fo_encoding == parquet_thrift.Encoding.BIT_PACKED:
+        raise NotImplementedError("Bit packing not yet supported")
+
+    return vals
+
+
+def read_data_page(fo, schema_helper, page_header, column_metadata,
+                   dictionary):
+    """Reads the datapage from the given file-like object based upon the
+    metadata in the schema_helper, page_header, column_metadata, and
+    (optional) dictionary. Returns a list of values.
+    """
+    daph = page_header.data_page_header
+    raw_bytes = _read_page(fo, page_header, column_metadata)
+    io_obj = io.BytesIO(raw_bytes)
+    vals = []
+    debug_logging = logger.isEnabledFor(logging.DEBUG)
+
+    if debug_logging:
+        logger.debug("  definition_level_encoding: %s",
+                     _get_name(parquet_thrift.Encoding, daph.definition_level_encoding))
+        logger.debug("  repetition_level_encoding: %s",
+                     _get_name(parquet_thrift.Encoding, daph.repetition_level_encoding))
+        logger.debug("  encoding: %s", _get_name(parquet_thrift.Encoding, daph.encoding))
+
+    # definition levels are skipped if data is required.
+    definition_levels = None
+    num_nulls = 0
+    max_definition_level = -1
+    if not schema_helper.is_required(column_metadata.path_in_schema[-1]):
+        max_definition_level = schema_helper.max_definition_level(
+            column_metadata.path_in_schema)
+        bit_width = encoding.width_from_max_int(max_definition_level)
+        if debug_logging:
+            logger.debug("  max def level: %s   bit_width: %s",
+                         max_definition_level, bit_width)
+        if bit_width == 0:
+            definition_levels = [0] * daph.num_values
+        else:
+            definition_levels = _read_data(io_obj,
+                                           daph.definition_level_encoding,
+                                           daph.num_values,
+                                           bit_width)[:daph.num_values]
+
+        # any thing that isn't at max definition level is a null.
+        num_nulls = len(definition_levels) - definition_levels.count(max_definition_level)
+        if debug_logging:
+            logger.debug("  Definition levels: %s", len(definition_levels))
+
+    # repetition levels are skipped if data is at the first level.
+    repetition_levels = None
+    if len(column_metadata.path_in_schema) > 1:
+        max_repetition_level = schema_helper.max_repetition_level(
+            column_metadata.path_in_schema)
+        bit_width = encoding.width_from_max_int(max_repetition_level)
+        repetition_levels = _read_data(io_obj,
+                                       daph.repetition_level_encoding,
+                                       daph.num_values,
+                                       bit_width)
+
+    # TODO Actually use the repetition levels.
+    if daph.encoding == parquet_thrift.Encoding.PLAIN:
+        read_values = \
+            encoding.read_plain(io_obj, column_metadata.type, daph.num_values - num_nulls)
+        if definition_levels:
+            it = iter(read_values)
+            vals.extend([next(it) if level == max_definition_level else None for level in definition_levels])
+        else:
+            vals.extend(read_values)
+        if debug_logging:
+            logger.debug("  Values: %s, nulls: %s", len(vals), num_nulls)
+    elif daph.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY:
+        # bit_width is stored as single byte.
+        bit_width = struct.unpack("<B", io_obj.read(1))[0]
+        if debug_logging:
+            logger.debug("bit_width: %d", bit_width)
+        total_seen = 0
+        dict_values_bytes = io_obj.read()
+        dict_values_io_obj = io.BytesIO(dict_values_bytes)
+        # TODO jcrobak -- not sure that this loop is needed?
+        while total_seen < daph.num_values:
+            values = encoding.read_rle_bit_packed_hybrid(
+                dict_values_io_obj, bit_width, len(dict_values_bytes))
+            if len(values) + total_seen > daph.num_values:
+                values = values[0: daph.num_values - total_seen]
+            vals += [dictionary[v] for v in values]
+            total_seen += len(values)
+    else:
+        raise ParquetFormatException("Unsupported encoding: %s",
+                                     _get_name(Encoding, daph.encoding))
+    return vals
+
+
+def read_dictionary_page(fo, page_header, column_metadata):
+    raw_bytes = _read_page(fo, page_header, column_metadata)
+    io_obj = io.BytesIO(raw_bytes)
+    return encoding.read_plain(io_obj, column_metadata.type,
+        page_header.dictionary_page_header.num_values)
+
+
+def DictReader(fo, columns=None):
+    """
+    Reader for a parquet file object.
+
+    This function is a generator returning an OrderedDict for each row
+    of data in the parquet file. Nested values will be flattend into the
+    top-level dict and can be referenced with '.' notation (e.g. 'foo' -> 'bar'
+    is referenced as 'foo.bar')
+
+    :param fo: the file containing parquet data
+    :param columns: the columns to include. If None (default), all columns
+                    are included. Nested values are referenced with "." notation
+    """
+    footer = _read_footer(fo)
+    keys = columns if columns else [s.name for s in
+                                    footer.schema if s.type]
+
+    for row in reader(fo, columns):
+        yield OrderedDict(zip(keys, row))
+
+def reader(fo, columns=None):
+    """
+    Reader for a parquet file object.
+
+    This function is a generator returning a list of values for each row
+    of data in the parquet file.
+
+    :param fo: the file containing parquet data
+    :param columns: the columns to include. If None (default), all columns
+                    are included. Nested values are referenced with "." notation
+    """
+    footer = _read_footer(fo)
+    schema_helper = schema.SchemaHelper(footer.schema)
+    keys = columns if columns else [s.name for s in
+                                    footer.schema if s.type]
+    debug_logging = logger.isEnabledFor(logging.DEBUG)
+    for rg in footer.row_groups:
+        res = defaultdict(list)
+        row_group_rows = rg.num_rows
+        for idx, cg in enumerate(rg.columns):
+            dict_items = []
+            cmd = cg.meta_data
+            # skip if the list of columns is specified and this isn't in it
+            if columns and not ".".join(cmd.path_in_schema) in columns:
+                continue
+
+            offset = _get_offset(cmd)
+            fo.seek(offset, 0)
+            values_seen = 0
+            if debug_logging:
+                logger.debug("reading column chunk of type: %s",
+                             _get_name(parquet_thrift.Type, cmd.type))
+            while values_seen < row_group_rows:
+                ph = _read_page_header(fo)
+                if debug_logging:
+                    logger.debug("Reading page (type=%s, "
+                                 "uncompressed=%s bytes, "
+                                 "compressed=%s bytes)",
+                                 _get_name(parquet_thrift.PageType, ph.type),
+                                 ph.uncompressed_page_size,
+                                 ph.compressed_page_size)
+
+                if ph.type == parquet_thrift.PageType.DATA_PAGE:
+                    values = read_data_page(fo, schema_helper, ph, cmd,
+                                            dict_items)
+                    res[".".join(cmd.path_in_schema)] += values
+                    values_seen += ph.data_page_header.num_values
+                elif ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
+                    if debug_logging:
+                        logger.debug(ph)
+                    assert dict_items == []
+                    dict_items = read_dictionary_page(fo, ph, cmd)
+                    if debug_logging:
+                        logger.debug("Dictionary: %s", str(dict_items))
+                else:
+                    logger.warn("Skipping unknown page type={0}".format(
+                        _get_name(parquet_thrift.PageType, ph.type)))
+
+        for i in range(rg.num_rows):
+            yield [res[k][i] for k in keys if res[k]]
+
+class JsonWriter(object):
+    def __init__(self, out):
+        self._out = out
+
+    def writerow(self, row):
+        json_text = json.dumps(row)
+        if type(json_text) is bytes:
+            json_text = json_text.decode('utf-8')
+        self._out.write(json_text)
+        self._out.write(u'\n')
+
+def _dump(fo, options, out=sys.stdout):
+
+    # writer and keys are lazily loaded. We don't know the keys until we have
+    # the first item. And we need the keys for the csv writer.
+    total_count = 0
+    writer = None
+    keys = None
+    for row in DictReader(fo, options.col):
+        if not keys:
+            keys = row.keys()
+        if not writer:
+            writer = csv.DictWriter(out, keys, delimiter=u'\t', quotechar=u'\'',
+                quoting=csv.QUOTE_MINIMAL) if options.format == 'csv' \
+                    else JsonWriter(out) if options.format == 'json' \
+                    else None
+        if total_count == 0 and options.format == "csv" and not options.no_headers:
+            writer.writeheader()
+        if options.limit != -1 and total_count >= options.limit:
+            return
+        row_unicode = {k: v.decode("utf-8") if type(v) is bytes else v for k, v in row.items()}
+        writer.writerow(row_unicode)
+        total_count += 1
+
+
+def dump(filename, options, out=sys.stdout):
+    with open(filename, 'rb') as fo:
+        return _dump(fo, options=options, out=out)
diff --git a/parquet/__main__.py b/parquet/__main__.py
new file mode 100644
index 0000000..9604fff
--- /dev/null
+++ b/parquet/__main__.py
@@ -0,0 +1,61 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+
+import argparse
+import logging
+import sys
+
+
+def setup_logging(options=None):
+    level = logging.DEBUG if options is not None and options.debug \
+        else logging.WARNING
+    console = logging.StreamHandler()
+    console.setLevel(level)
+    formatter = logging.Formatter('%(name)s: %(levelname)-8s %(message)s')
+    console.setFormatter(formatter)
+    logging.getLogger('parquet').setLevel(level)
+    logging.getLogger('parquet').addHandler(console)
+
+
+def main(argv=None):
+    argv = argv or sys.argv[1:]
+
+    parser = argparse.ArgumentParser('parquet',
+                                     description='Read parquet files')
+    parser.add_argument('--metadata', action='store_true',
+                        help='show metadata on file')
+    parser.add_argument('--row-group-metadata', action='store_true',
+                        help="show per row group metadata")
+    parser.add_argument('--no-data', action='store_true',
+                        help="don't dump any data from the file")
+    parser.add_argument('--limit', action='store', type=int, default=-1,
+                        help='max records to output')
+    parser.add_argument('--col', action='append', type=str,
+                        help='only include this column (can be '
+                             'specified multiple times)')
+    parser.add_argument('--no-headers', action='store_true',
+                        help='skip headers in output (only applies if '
+                             'format=csv)')
+    parser.add_argument('--format', action='store', type=str, default='csv',
+                        help='format for the output data. can be csv or json.')
+    parser.add_argument('--debug', action='store_true',
+                        help='log debug info to stderr')
+    parser.add_argument('file',
+                        help='path to the file to parse')
+
+    args = parser.parse_args(argv)
+
+    setup_logging(args)
+
+    import parquet
+
+    if args.metadata:
+        parquet.dump_metadata(args.file, args.row_group_metadata)
+    if not args.no_data:
+        parquet.dump(args.file, args)
+
+if __name__ == '__main__':
+    main()
diff --git a/parquet/bitstring.py b/parquet/bitstring.py
new file mode 100644
index 0000000..b85b4f1
--- /dev/null
+++ b/parquet/bitstring.py
@@ -0,0 +1,20 @@
+
+SINGLE_BIT_MASK =  [1 << x for x in range(7, -1, -1)]
+
+class BitString(object):
+
+	def __init__(self, bytes, length=None, offset=None):
+		self.bytes = bytes
+		self.offset = offset if offset is not None else 0
+		self.length = length if length is not None else 8 * len(data) - self.offset 
+
+
+	def __getitem__(self, key):
+		try:
+			start = key.start
+			stop = key.stop
+		except AttributeError:
+			if key < 0 or key >= length:
+				raise IndexError()
+			byte_index, bit_offset = (divmod(self.offset + key), 8)
+			return self.bytes[byte_index] & SINGLE_BIT_MASK[bit_offset]
diff --git a/parquet/encoding.py b/parquet/encoding.py
new file mode 100644
index 0000000..7092ed4
--- /dev/null
+++ b/parquet/encoding.py
@@ -0,0 +1,228 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import array
+import io
+import math
+import os
+import struct
+import logging
+
+import thriftpy
+
+THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
+parquet_thrift = thriftpy.load(THRIFT_FILE, module_name=str("parquet_thrift"))
+
+logger = logging.getLogger("parquet")
+
+
+def read_plain_boolean(fo, count):
+    """Reads `count` booleans using the plain encoding"""
+    # for bit packed, the count is stored shifted up. But we want to pass in a count,
+    # so we shift up.
+    # bit width is 1 for a single-bit boolean.
+    return read_bitpacked(fo, count << 1, 1, logger.isEnabledFor(logging.DEBUG))
+
+
+def read_plain_int32(fo, count):
+    """Reads `count` 32-bit ints using the plain encoding"""
+    length = 4 * count
+    data = fo.read(length)
+    if len(data) != length:
+        raise EOFError("Expected {} bytes but got {} bytes".format(length, len(data)))
+    res = struct.unpack("<{}i".format(count), data)
+    return res
+
+
+def read_plain_int64(fo, count):
+    """Reads `count` 64-bit ints using the plain encoding"""
+    return struct.unpack("<{}q".format(count), fo.read(8 * count))
+
+
+def read_plain_int96(fo, count):
+    """Reads `count` 96-bit ints using the plain encoding"""
+    items = struct.unpack("<qi" * count, fo.read(12) * count)
+    args = [iter(items)] * 2
+    return [q << 32 | i for (q, i) in zip(*args)]
+
+
+def read_plain_float(fo, count):
+    """Reads `count` 32-bit floats using the plain encoding"""
+    return struct.unpack("<{}f".format(count), fo.read(4 * count))
+
+
+def read_plain_double(fo, count):
+    """Reads `count` 64-bit float (double) using the plain encoding"""
+    return struct.unpack("<{}d".format(count), fo.read(8 * count))
+
+
+def read_plain_byte_array(fo, count):
+    """Read `count` byte arrays using the plain encoding"""
+    return [fo.read(struct.unpack("<i", fo.read(4))[0]) for i in range(count)]
+
+
+def read_plain_byte_array_fixed(fo, fixed_length):
+    """Reads a byte array of the given fixed_length"""
+    return fo.read(fixed_length)
+
+
+DECODE_PLAIN = {
+    parquet_thrift.Type.BOOLEAN: read_plain_boolean,
+    parquet_thrift.Type.INT32: read_plain_int32,
+    parquet_thrift.Type.INT64: read_plain_int64,
+    parquet_thrift.Type.INT96: read_plain_int96,
+    parquet_thrift.Type.FLOAT: read_plain_float,
+    parquet_thrift.Type.DOUBLE: read_plain_double,
+    parquet_thrift.Type.BYTE_ARRAY: read_plain_byte_array,
+    parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY: read_plain_byte_array_fixed
+}
+
+
+def read_plain(fo, type_, count):
+    """Reads `count` items `type` from the fo using the plain encoding."""
+    if count == 0:
+        return []
+    conv = DECODE_PLAIN[type_]
+    return conv(fo, count)
+
+
+def read_unsigned_var_int(fo):
+    result = 0
+    shift = 0
+    while True:
+        byte = struct.unpack("<B", fo.read(1))[0]
+        result |= ((byte & 0x7F) << shift)
+        if (byte & 0x80) == 0:
+            break
+        shift += 7
+    return result
+
+
+def read_rle(fo, header, bit_width, debug_logging):
+    """Read a run-length encoded run from the given fo with the given header
+    and bit_width.
+
+    The count is determined from the header and the width is used to grab the
+    value that's repeated. Yields the value repeated count times.
+    """
+    count = header >> 1
+    zero_data = b"\x00\x00\x00\x00"
+    width = (bit_width + 7) // 8
+    data = fo.read(width)
+    data = data + zero_data[len(data):]
+    value = struct.unpack("<i", data)[0]
+    if debug_logging:
+        logger.debug("Read RLE group with value %s of byte-width %s and count %s",
+                     value, width, count)
+    for i in range(count):
+        yield value
+
+
+def width_from_max_int(value):
+    """Converts the value specified to a bit_width."""
+    return int(math.ceil(math.log(value + 1, 2)))
+
+
+def _mask_for_bits(i):
+    """Helper function for read_bitpacked to generage a mask to grab i bits."""
+    return (1 << i) - 1
+
+
+def read_bitpacked(fo, header, width, debug_logging):
+    """Reads a bitpacked run of the rle/bitpack hybrid.
+
+    Supports width >8 (crossing bytes).
+    """
+    num_groups = header >> 1
+    count = num_groups * 8
+    byte_count = (width * count) // 8
+    if debug_logging:
+        logger.debug("Reading a bit-packed run with: %s groups, count %s, bytes %s",
+            num_groups, count, byte_count)
+    raw_bytes = array.array(str('B'), fo.read(byte_count)).tolist()
+    current_byte = 0
+    b = raw_bytes[current_byte]
+    mask = _mask_for_bits(width)
+    bits_wnd_l = 8
+    bits_wnd_r = 0
+    res = []
+    total = len(raw_bytes)*8;
+    while (total >= width):
+        # TODO zero-padding could produce extra zero-values
+        if debug_logging:
+            logger.debug("  read bitpacked: width=%s window=(%s %s) b=%s,"
+                         " current_byte=%s",
+                         width, bits_wnd_l, bits_wnd_r, bin(b), current_byte)
+        if bits_wnd_r >= 8:
+            bits_wnd_r -= 8
+            bits_wnd_l -= 8
+            b >>= 8
+        elif bits_wnd_l - bits_wnd_r >= width:
+            res.append((b >> bits_wnd_r) & mask)
+            total -= width
+            bits_wnd_r += width
+            if debug_logging:
+                logger.debug("  read bitpackage: added: %s", res[-1])
+        elif current_byte + 1 < len(raw_bytes):
+            current_byte += 1
+            b |= (raw_bytes[current_byte] << bits_wnd_l)
+            bits_wnd_l += 8
+    return res
+
+
+def read_bitpacked_deprecated(fo, byte_count, count, width, debug_logging):
+    raw_bytes = array.array(str('B'), fo.read(byte_count)).tolist()
... 1116 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-parquet.git



More information about the Python-modules-commits mailing list