[Python-modules-commits] [python-parquet] 01/08: New upstream version 1.1
ChangZhuo Chen
czchen at moszumanska.debian.org
Tue Sep 20 03:33:57 UTC 2016
This is an automated email from the git hooks/post-receive script.
czchen pushed a commit to branch master
in repository python-parquet.
commit ce128f075b66216c8daff74225f4f04109d1f5c2
Author: ChangZhuo Chen (陳昌倬) <czchen at debian.org>
Date: Tue Sep 20 10:54:50 2016 +0800
New upstream version 1.1
---
MANIFEST.in | 1 +
PKG-INFO | 22 ++
README.rst | 83 +++++
parquet.egg-info/PKG-INFO | 22 ++
parquet.egg-info/SOURCES.txt | 18 ++
parquet.egg-info/dependency_links.txt | 1 +
parquet.egg-info/entry_points.txt | 3 +
parquet.egg-info/requires.txt | 5 +
parquet.egg-info/top_level.txt | 1 +
parquet/__init__.py | 480 ++++++++++++++++++++++++++++
parquet/__main__.py | 61 ++++
parquet/bitstring.py | 20 ++
parquet/encoding.py | 228 ++++++++++++++
parquet/parquet.thrift | 573 ++++++++++++++++++++++++++++++++++
parquet/schema.py | 48 +++
parquet/thrift_filetransport.py | 17 +
setup.cfg | 5 +
setup.py | 44 +++
test/test_encoding.py | 135 ++++++++
test/test_read_support.py | 199 ++++++++++++
20 files changed, 1966 insertions(+)
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..89e8b96
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+recursive-include parquet *.thrift
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..836d1cf
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,22 @@
+Metadata-Version: 1.1
+Name: parquet
+Version: 1.1
+Summary: Python support for Parquet file format
+Home-page: https://github.com/jcrobak/parquet-python
+Author: Joe Crobak
+Author-email: joecrow at gmail.com
+License: Apache License 2.0
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: Implementation :: CPython
+Classifier: Programming Language :: Python :: Implementation :: PyPy
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..72acb4e
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,83 @@
+parquet-python
+==============
+
+parquet-python is a pure-python implementation (currently with only
+read-support) of the `parquet
+format <https://github.com/Parquet/parquet-format>`_. It comes with a
+script for reading parquet files and outputting the data to stdout as
+JSON or TSV (without the overhead of JVM startup). Performance has not
+yet been optimized, but it's useful for debugging and quick viewing of
+data in files.
+
+Not all parts of the parquet-format have been implemented yet or tested
+e.g. nested data—see Todos below for a full list. With that said,
+parquet-python is capable of reading all the data files from the
+`parquet-compatability <https://github.com/Parquet/parquet-compatibility>`_
+project.
+
+requirements
+============
+
+parquet-python has been tested on python 2.7, 3.4, and 3.5. It depends
+on ``thrift`` (0.9) and ``python-snappy`` (for snappy compressed files).
+
+getting started
+===============
+
+parquet-python is available via PyPi and can be installed using
+`pip install parquet`. The package includes the `parquet`
+command for reading python files, e.g. `parquet test.parquet`.
+See `parquet --help` for full usage.
+
+Example
+-------
+
+parquet-python currently has two programatic interfaces with similar
+functionality to Python's csv reader. First, it supports a DictReader
+which returns a dictionary per row. Second, it has a reader which
+returns a list of values for each row. Both function require a file-like
+object and support an optional ``columns`` field to only read the
+specified columns.
+
+.. code:: python
+
+
+ import parquet
+ import json
+
+ ## assuming parquet file with two rows and three columns:
+ ## foo bar baz
+ ## 1 2 3
+ ## 4 5 6
+
+ with open("test.parquet") as fo:
+ # prints:
+ # {"foo": 1, "bar": 2}
+ # {"foo": 4, "bar": 5}
+ for row in parquet.DictReader(fo, columns=['foo', 'bar']):
+ print(json.dumps(row))
+
+
+ with open("test.parquet") as fo:
+ # prints:
+ # 1,2
+ # 4,5
+ for row in parquet.reader(fo, columns=['foo', 'bar]):
+ print(",".join([str(r) for r in row]))
+
+Todos
+=====
+
+- Support the deprecated bitpacking
+- Fix handling of repetition-levels and definition-levels
+- Tests for nested schemas, null data
+- Support reading of data from HDFS via snakebite and/or webhdfs.
+- Implement writing
+- performance evaluation and optimization (i.e. how does it compare to
+ the c++, java implementations)
+
+Contributing
+============
+
+Is done via Pull Requests. Please include tests with your changes and
+follow `pep8 <http://www.python.org/dev/peps/pep-0008/>`_.
diff --git a/parquet.egg-info/PKG-INFO b/parquet.egg-info/PKG-INFO
new file mode 100644
index 0000000..836d1cf
--- /dev/null
+++ b/parquet.egg-info/PKG-INFO
@@ -0,0 +1,22 @@
+Metadata-Version: 1.1
+Name: parquet
+Version: 1.1
+Summary: Python support for Parquet file format
+Home-page: https://github.com/jcrobak/parquet-python
+Author: Joe Crobak
+Author-email: joecrow at gmail.com
+License: Apache License 2.0
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: Implementation :: CPython
+Classifier: Programming Language :: Python :: Implementation :: PyPy
diff --git a/parquet.egg-info/SOURCES.txt b/parquet.egg-info/SOURCES.txt
new file mode 100644
index 0000000..d68cd75
--- /dev/null
+++ b/parquet.egg-info/SOURCES.txt
@@ -0,0 +1,18 @@
+MANIFEST.in
+README.rst
+setup.py
+parquet/__init__.py
+parquet/__main__.py
+parquet/bitstring.py
+parquet/encoding.py
+parquet/parquet.thrift
+parquet/schema.py
+parquet/thrift_filetransport.py
+parquet.egg-info/PKG-INFO
+parquet.egg-info/SOURCES.txt
+parquet.egg-info/dependency_links.txt
+parquet.egg-info/entry_points.txt
+parquet.egg-info/requires.txt
+parquet.egg-info/top_level.txt
+test/test_encoding.py
+test/test_read_support.py
\ No newline at end of file
diff --git a/parquet.egg-info/dependency_links.txt b/parquet.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/parquet.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/parquet.egg-info/entry_points.txt b/parquet.egg-info/entry_points.txt
new file mode 100644
index 0000000..e27f517
--- /dev/null
+++ b/parquet.egg-info/entry_points.txt
@@ -0,0 +1,3 @@
+[console_scripts]
+parquet = parquet.__main__:main
+
diff --git a/parquet.egg-info/requires.txt b/parquet.egg-info/requires.txt
new file mode 100644
index 0000000..aa0f656
--- /dev/null
+++ b/parquet.egg-info/requires.txt
@@ -0,0 +1,5 @@
+python-snappy
+thriftpy>=0.3.6
+
+[:python_version=="2.7"]
+backports.csv
diff --git a/parquet.egg-info/top_level.txt b/parquet.egg-info/top_level.txt
new file mode 100644
index 0000000..c1d924a
--- /dev/null
+++ b/parquet.egg-info/top_level.txt
@@ -0,0 +1 @@
+parquet
diff --git a/parquet/__init__.py b/parquet/__init__.py
new file mode 100644
index 0000000..356be2f
--- /dev/null
+++ b/parquet/__init__.py
@@ -0,0 +1,480 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import gzip
+import io
+import json
+import logging
+import os
+import struct
+import sys
+from collections import defaultdict, OrderedDict
+
+import thriftpy
+from thriftpy.protocol.compact import TCompactProtocolFactory
+
+from .thrift_filetransport import TFileTransport
+from . import encoding
+from . import schema
+
+PY3 = sys.version_info > (3,)
+
+if PY3:
+ import csv
+else:
+ from backports import csv
+
+THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
+parquet_thrift = thriftpy.load(THRIFT_FILE, module_name="parquet_thrift")
+
+
+
+logger = logging.getLogger("parquet")
+
+try:
+ import snappy
+except ImportError:
+ logger.warn(
+ "Couldn't import snappy. Support for snappy compression disabled.")
+
+
+class ParquetFormatException(Exception):
+ pass
+
+
+def _check_header_magic_bytes(fo):
+ "Returns true if the file-like obj has the PAR1 magic bytes at the header"
+ fo.seek(0, 0)
+ magic = fo.read(4)
+ return magic == b'PAR1'
+
+
+def _check_footer_magic_bytes(fo):
+ "Returns true if the file-like obj has the PAR1 magic bytes at the footer"
+ fo.seek(-4, 2) # seek to four bytes from the end of the file
+ magic = fo.read(4)
+ return magic == b'PAR1'
+
+
+def _get_footer_size(fo):
+ "Readers the footer size in bytes, which is serialized as little endian"
+ fo.seek(-8, 2)
+ tup = struct.unpack("<i", fo.read(4))
+ return tup[0]
+
+
+def _read_footer(fo):
+ """Reads the footer from the given file object, returning a FileMetaData
+ object. This method assumes that the fo references a valid parquet file"""
+ footer_size = _get_footer_size(fo)
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Footer size in bytes: %s", footer_size)
+ fo.seek(-(8 + footer_size), 2) # seek to beginning of footer
+ tin = TFileTransport(fo)
+ pin = TCompactProtocolFactory().get_protocol(tin)
+ fmd = parquet_thrift.FileMetaData()
+ fmd.read(pin)
+ return fmd
+
+
+def _read_page_header(fo):
+ """Reads the page_header from the given fo"""
+ tin = TFileTransport(fo)
+ pin = TCompactProtocolFactory().get_protocol(tin)
+ ph = parquet_thrift.PageHeader()
+ ph.read(pin)
+ return ph
+
+
+def read_footer(filename):
+ """Reads and returns the FileMetaData object for the given file."""
+ with open(filename, 'rb') as fo:
+ if not _check_header_magic_bytes(fo) or \
+ not _check_footer_magic_bytes(fo):
+ raise ParquetFormatException("{0} is not a valid parquet file "
+ "(missing magic bytes)"
+ .format(filename))
+ return _read_footer(fo)
+
+
+def _get_name(type_, value):
+ """Returns the name for the given value of the given type_ unless value is
+ None, in which case it returns empty string"""
+ return type_._VALUES_TO_NAMES[value] if value is not None else "None"
+
+
+def _get_offset(cmd):
+ """Returns the offset into the cmd based upon if it's a dictionary page or
+ a data page"""
+ dict_offset = cmd.dictionary_page_offset
+ data_offset = cmd.data_page_offset
+ if dict_offset is None or data_offset < dict_offset:
+ return data_offset
+ return dict_offset
+
+
+def dump_metadata(filename, show_row_group_metadata, out=sys.stdout):
+ def println(value):
+ out.write(value + "\n")
+ footer = read_footer(filename)
+ println("File Metadata: {0}".format(filename))
+ println(" Version: {0}".format(footer.version))
+ println(" Num Rows: {0}".format(footer.num_rows))
+ println(" k/v metadata: ")
+ if footer.key_value_metadata and len(footer.key_value_metadata) > 0:
+ for kv in footer.key_value_metadata:
+ println(" {0}={1}".format(kv.key, kv.value))
+ else:
+ println(" (none)")
+ println(" schema: ")
+ for se in footer.schema:
+ println(" {name} ({type}): length={type_length}, "
+ "repetition={repetition_type}, "
+ "children={num_children}, "
+ "converted_type={converted_type}".format(
+ name=se.name,
+ type=parquet_thrift.Type._VALUES_TO_NAMES[se.type] if se.type else None,
+ type_length=se.type_length,
+ repetition_type=_get_name(parquet_thrift.FieldRepetitionType,
+ se.repetition_type),
+ num_children=se.num_children,
+ converted_type=se.converted_type))
+ if show_row_group_metadata:
+ println(" row groups: ")
+ for rg in footer.row_groups:
+ num_rows = rg.num_rows
+ bytes = rg.total_byte_size
+ println(
+ " rows={num_rows}, bytes={bytes}".format(num_rows=num_rows,
+ bytes=bytes))
+ println(" chunks:")
+ for cg in rg.columns:
+ cmd = cg.meta_data
+ println(" type={type} file_offset={offset} "
+ "compression={codec} "
+ "encodings={encodings} path_in_schema={path_in_schema} "
+ "num_values={num_values} uncompressed_bytes={raw_bytes} "
+ "compressed_bytes={compressed_bytes} "
+ "data_page_offset={data_page_offset} "
+ "dictionary_page_offset={dictionary_page_offset}".format(
+ type=_get_name(parquet_thrift.Type, cmd.type),
+ offset=cg.file_offset,
+ codec=_get_name(parquet_thrift.CompressionCodec, cmd.codec),
+ encodings=",".join(
+ [_get_name(
+ parquet_thrift.Encoding, s) for s in cmd.encodings]),
+ path_in_schema=cmd.path_in_schema,
+ num_values=cmd.num_values,
+ raw_bytes=cmd.total_uncompressed_size,
+ compressed_bytes=cmd.total_compressed_size,
+ data_page_offset=cmd.data_page_offset,
+ dictionary_page_offset=cmd.dictionary_page_offset))
+ with open(filename, 'rb') as fo:
+ offset = _get_offset(cmd)
+ fo.seek(offset, 0)
+ values_read = 0
+ println(" pages: ")
+ while values_read < num_rows:
+ ph = _read_page_header(fo)
+ # seek past current page.
+ fo.seek(ph.compressed_page_size, 1)
+ daph = ph.data_page_header
+ type_ = _get_name(parquet_thrift.PageType, ph.type)
+ raw_bytes = ph.uncompressed_page_size
+ num_values = None
+ if ph.type == parquet_thrift.PageType.DATA_PAGE:
+ num_values = daph.num_values
+ values_read += num_values
+ if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
+ pass
+ #num_values = diph.num_values
+
+ encoding_type = None
+ def_level_encoding = None
+ rep_level_encoding = None
+ if daph:
+ encoding_type = _get_name(parquet_thrift.Encoding, daph.encoding)
+ def_level_encoding = _get_name(
+ parquet_thrift.Encoding, daph.definition_level_encoding)
+ rep_level_encoding = _get_name(
+ parquet_thrift.Encoding, daph.repetition_level_encoding)
+
+ println(" page header: type={type} "
+ "uncompressed_size={raw_bytes} "
+ "num_values={num_values} encoding={encoding} "
+ "def_level_encoding={def_level_encoding} "
+ "rep_level_encoding={rep_level_encoding}".format(
+ type=type_,
+ raw_bytes=raw_bytes,
+ num_values=num_values,
+ encoding=encoding_type,
+ def_level_encoding=def_level_encoding,
+ rep_level_encoding=rep_level_encoding))
+
+
+def _read_page(fo, page_header, column_metadata):
+ """Internal function to read the data page from the given file-object
+ and convert it to raw, uncompressed bytes (if necessary)."""
+ bytes_from_file = fo.read(page_header.compressed_page_size)
+ codec = column_metadata.codec
+ if codec is not None and codec != parquet_thrift.CompressionCodec.UNCOMPRESSED:
+ if column_metadata.codec == parquet_thrift.CompressionCodec.SNAPPY:
+ raw_bytes = snappy.decompress(bytes_from_file)
+ elif column_metadata.codec == parquet_thrift.CompressionCodec.GZIP:
+ io_obj = io.BytesIO(bytes_from_file)
+ with gzip.GzipFile(fileobj=io_obj, mode='rb') as f:
+ raw_bytes = f.read()
+ else:
+ raise ParquetFormatException(
+ "Unsupported Codec: {0}".format(codec))
+ else:
+ raw_bytes = bytes_from_file
+
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug(
+ "Read page with compression type {0}. Bytes {1} -> {2}".format(
+ _get_name(parquet_thrift.CompressionCodec, codec),
+ page_header.compressed_page_size,
+ page_header.uncompressed_page_size))
+ assert len(raw_bytes) == page_header.uncompressed_page_size, \
+ "found {0} raw bytes (expected {1})".format(
+ len(raw_bytes),
+ page_header.uncompressed_page_size)
+ return raw_bytes
+
+
+def _read_data(fo, fo_encoding, value_count, bit_width):
+ """Internal method to read data from the file-object using the given
+ encoding. The data could be definition levels, repetition levels, or
+ actual values.
+ """
+ vals = []
+ if fo_encoding == parquet_thrift.Encoding.RLE:
+ seen = 0
+ while seen < value_count:
+ values = encoding.read_rle_bit_packed_hybrid(fo, bit_width)
+ if values is None:
+ break # EOF was reached.
+ vals += values
+ seen += len(values)
+ elif fo_encoding == parquet_thrift.Encoding.BIT_PACKED:
+ raise NotImplementedError("Bit packing not yet supported")
+
+ return vals
+
+
+def read_data_page(fo, schema_helper, page_header, column_metadata,
+ dictionary):
+ """Reads the datapage from the given file-like object based upon the
+ metadata in the schema_helper, page_header, column_metadata, and
+ (optional) dictionary. Returns a list of values.
+ """
+ daph = page_header.data_page_header
+ raw_bytes = _read_page(fo, page_header, column_metadata)
+ io_obj = io.BytesIO(raw_bytes)
+ vals = []
+ debug_logging = logger.isEnabledFor(logging.DEBUG)
+
+ if debug_logging:
+ logger.debug(" definition_level_encoding: %s",
+ _get_name(parquet_thrift.Encoding, daph.definition_level_encoding))
+ logger.debug(" repetition_level_encoding: %s",
+ _get_name(parquet_thrift.Encoding, daph.repetition_level_encoding))
+ logger.debug(" encoding: %s", _get_name(parquet_thrift.Encoding, daph.encoding))
+
+ # definition levels are skipped if data is required.
+ definition_levels = None
+ num_nulls = 0
+ max_definition_level = -1
+ if not schema_helper.is_required(column_metadata.path_in_schema[-1]):
+ max_definition_level = schema_helper.max_definition_level(
+ column_metadata.path_in_schema)
+ bit_width = encoding.width_from_max_int(max_definition_level)
+ if debug_logging:
+ logger.debug(" max def level: %s bit_width: %s",
+ max_definition_level, bit_width)
+ if bit_width == 0:
+ definition_levels = [0] * daph.num_values
+ else:
+ definition_levels = _read_data(io_obj,
+ daph.definition_level_encoding,
+ daph.num_values,
+ bit_width)[:daph.num_values]
+
+ # any thing that isn't at max definition level is a null.
+ num_nulls = len(definition_levels) - definition_levels.count(max_definition_level)
+ if debug_logging:
+ logger.debug(" Definition levels: %s", len(definition_levels))
+
+ # repetition levels are skipped if data is at the first level.
+ repetition_levels = None
+ if len(column_metadata.path_in_schema) > 1:
+ max_repetition_level = schema_helper.max_repetition_level(
+ column_metadata.path_in_schema)
+ bit_width = encoding.width_from_max_int(max_repetition_level)
+ repetition_levels = _read_data(io_obj,
+ daph.repetition_level_encoding,
+ daph.num_values,
+ bit_width)
+
+ # TODO Actually use the repetition levels.
+ if daph.encoding == parquet_thrift.Encoding.PLAIN:
+ read_values = \
+ encoding.read_plain(io_obj, column_metadata.type, daph.num_values - num_nulls)
+ if definition_levels:
+ it = iter(read_values)
+ vals.extend([next(it) if level == max_definition_level else None for level in definition_levels])
+ else:
+ vals.extend(read_values)
+ if debug_logging:
+ logger.debug(" Values: %s, nulls: %s", len(vals), num_nulls)
+ elif daph.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY:
+ # bit_width is stored as single byte.
+ bit_width = struct.unpack("<B", io_obj.read(1))[0]
+ if debug_logging:
+ logger.debug("bit_width: %d", bit_width)
+ total_seen = 0
+ dict_values_bytes = io_obj.read()
+ dict_values_io_obj = io.BytesIO(dict_values_bytes)
+ # TODO jcrobak -- not sure that this loop is needed?
+ while total_seen < daph.num_values:
+ values = encoding.read_rle_bit_packed_hybrid(
+ dict_values_io_obj, bit_width, len(dict_values_bytes))
+ if len(values) + total_seen > daph.num_values:
+ values = values[0: daph.num_values - total_seen]
+ vals += [dictionary[v] for v in values]
+ total_seen += len(values)
+ else:
+ raise ParquetFormatException("Unsupported encoding: %s",
+ _get_name(Encoding, daph.encoding))
+ return vals
+
+
+def read_dictionary_page(fo, page_header, column_metadata):
+ raw_bytes = _read_page(fo, page_header, column_metadata)
+ io_obj = io.BytesIO(raw_bytes)
+ return encoding.read_plain(io_obj, column_metadata.type,
+ page_header.dictionary_page_header.num_values)
+
+
+def DictReader(fo, columns=None):
+ """
+ Reader for a parquet file object.
+
+ This function is a generator returning an OrderedDict for each row
+ of data in the parquet file. Nested values will be flattend into the
+ top-level dict and can be referenced with '.' notation (e.g. 'foo' -> 'bar'
+ is referenced as 'foo.bar')
+
+ :param fo: the file containing parquet data
+ :param columns: the columns to include. If None (default), all columns
+ are included. Nested values are referenced with "." notation
+ """
+ footer = _read_footer(fo)
+ keys = columns if columns else [s.name for s in
+ footer.schema if s.type]
+
+ for row in reader(fo, columns):
+ yield OrderedDict(zip(keys, row))
+
+def reader(fo, columns=None):
+ """
+ Reader for a parquet file object.
+
+ This function is a generator returning a list of values for each row
+ of data in the parquet file.
+
+ :param fo: the file containing parquet data
+ :param columns: the columns to include. If None (default), all columns
+ are included. Nested values are referenced with "." notation
+ """
+ footer = _read_footer(fo)
+ schema_helper = schema.SchemaHelper(footer.schema)
+ keys = columns if columns else [s.name for s in
+ footer.schema if s.type]
+ debug_logging = logger.isEnabledFor(logging.DEBUG)
+ for rg in footer.row_groups:
+ res = defaultdict(list)
+ row_group_rows = rg.num_rows
+ for idx, cg in enumerate(rg.columns):
+ dict_items = []
+ cmd = cg.meta_data
+ # skip if the list of columns is specified and this isn't in it
+ if columns and not ".".join(cmd.path_in_schema) in columns:
+ continue
+
+ offset = _get_offset(cmd)
+ fo.seek(offset, 0)
+ values_seen = 0
+ if debug_logging:
+ logger.debug("reading column chunk of type: %s",
+ _get_name(parquet_thrift.Type, cmd.type))
+ while values_seen < row_group_rows:
+ ph = _read_page_header(fo)
+ if debug_logging:
+ logger.debug("Reading page (type=%s, "
+ "uncompressed=%s bytes, "
+ "compressed=%s bytes)",
+ _get_name(parquet_thrift.PageType, ph.type),
+ ph.uncompressed_page_size,
+ ph.compressed_page_size)
+
+ if ph.type == parquet_thrift.PageType.DATA_PAGE:
+ values = read_data_page(fo, schema_helper, ph, cmd,
+ dict_items)
+ res[".".join(cmd.path_in_schema)] += values
+ values_seen += ph.data_page_header.num_values
+ elif ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
+ if debug_logging:
+ logger.debug(ph)
+ assert dict_items == []
+ dict_items = read_dictionary_page(fo, ph, cmd)
+ if debug_logging:
+ logger.debug("Dictionary: %s", str(dict_items))
+ else:
+ logger.warn("Skipping unknown page type={0}".format(
+ _get_name(parquet_thrift.PageType, ph.type)))
+
+ for i in range(rg.num_rows):
+ yield [res[k][i] for k in keys if res[k]]
+
+class JsonWriter(object):
+ def __init__(self, out):
+ self._out = out
+
+ def writerow(self, row):
+ json_text = json.dumps(row)
+ if type(json_text) is bytes:
+ json_text = json_text.decode('utf-8')
+ self._out.write(json_text)
+ self._out.write(u'\n')
+
+def _dump(fo, options, out=sys.stdout):
+
+ # writer and keys are lazily loaded. We don't know the keys until we have
+ # the first item. And we need the keys for the csv writer.
+ total_count = 0
+ writer = None
+ keys = None
+ for row in DictReader(fo, options.col):
+ if not keys:
+ keys = row.keys()
+ if not writer:
+ writer = csv.DictWriter(out, keys, delimiter=u'\t', quotechar=u'\'',
+ quoting=csv.QUOTE_MINIMAL) if options.format == 'csv' \
+ else JsonWriter(out) if options.format == 'json' \
+ else None
+ if total_count == 0 and options.format == "csv" and not options.no_headers:
+ writer.writeheader()
+ if options.limit != -1 and total_count >= options.limit:
+ return
+ row_unicode = {k: v.decode("utf-8") if type(v) is bytes else v for k, v in row.items()}
+ writer.writerow(row_unicode)
+ total_count += 1
+
+
+def dump(filename, options, out=sys.stdout):
+ with open(filename, 'rb') as fo:
+ return _dump(fo, options=options, out=out)
diff --git a/parquet/__main__.py b/parquet/__main__.py
new file mode 100644
index 0000000..9604fff
--- /dev/null
+++ b/parquet/__main__.py
@@ -0,0 +1,61 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+
+import argparse
+import logging
+import sys
+
+
+def setup_logging(options=None):
+ level = logging.DEBUG if options is not None and options.debug \
+ else logging.WARNING
+ console = logging.StreamHandler()
+ console.setLevel(level)
+ formatter = logging.Formatter('%(name)s: %(levelname)-8s %(message)s')
+ console.setFormatter(formatter)
+ logging.getLogger('parquet').setLevel(level)
+ logging.getLogger('parquet').addHandler(console)
+
+
+def main(argv=None):
+ argv = argv or sys.argv[1:]
+
+ parser = argparse.ArgumentParser('parquet',
+ description='Read parquet files')
+ parser.add_argument('--metadata', action='store_true',
+ help='show metadata on file')
+ parser.add_argument('--row-group-metadata', action='store_true',
+ help="show per row group metadata")
+ parser.add_argument('--no-data', action='store_true',
+ help="don't dump any data from the file")
+ parser.add_argument('--limit', action='store', type=int, default=-1,
+ help='max records to output')
+ parser.add_argument('--col', action='append', type=str,
+ help='only include this column (can be '
+ 'specified multiple times)')
+ parser.add_argument('--no-headers', action='store_true',
+ help='skip headers in output (only applies if '
+ 'format=csv)')
+ parser.add_argument('--format', action='store', type=str, default='csv',
+ help='format for the output data. can be csv or json.')
+ parser.add_argument('--debug', action='store_true',
+ help='log debug info to stderr')
+ parser.add_argument('file',
+ help='path to the file to parse')
+
+ args = parser.parse_args(argv)
+
+ setup_logging(args)
+
+ import parquet
+
+ if args.metadata:
+ parquet.dump_metadata(args.file, args.row_group_metadata)
+ if not args.no_data:
+ parquet.dump(args.file, args)
+
+if __name__ == '__main__':
+ main()
diff --git a/parquet/bitstring.py b/parquet/bitstring.py
new file mode 100644
index 0000000..b85b4f1
--- /dev/null
+++ b/parquet/bitstring.py
@@ -0,0 +1,20 @@
+
+SINGLE_BIT_MASK = [1 << x for x in range(7, -1, -1)]
+
+class BitString(object):
+
+ def __init__(self, bytes, length=None, offset=None):
+ self.bytes = bytes
+ self.offset = offset if offset is not None else 0
+ self.length = length if length is not None else 8 * len(data) - self.offset
+
+
+ def __getitem__(self, key):
+ try:
+ start = key.start
+ stop = key.stop
+ except AttributeError:
+ if key < 0 or key >= length:
+ raise IndexError()
+ byte_index, bit_offset = (divmod(self.offset + key), 8)
+ return self.bytes[byte_index] & SINGLE_BIT_MASK[bit_offset]
diff --git a/parquet/encoding.py b/parquet/encoding.py
new file mode 100644
index 0000000..7092ed4
--- /dev/null
+++ b/parquet/encoding.py
@@ -0,0 +1,228 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import array
+import io
+import math
+import os
+import struct
+import logging
+
+import thriftpy
+
+THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
+parquet_thrift = thriftpy.load(THRIFT_FILE, module_name=str("parquet_thrift"))
+
+logger = logging.getLogger("parquet")
+
+
+def read_plain_boolean(fo, count):
+ """Reads `count` booleans using the plain encoding"""
+ # for bit packed, the count is stored shifted up. But we want to pass in a count,
+ # so we shift up.
+ # bit width is 1 for a single-bit boolean.
+ return read_bitpacked(fo, count << 1, 1, logger.isEnabledFor(logging.DEBUG))
+
+
+def read_plain_int32(fo, count):
+ """Reads `count` 32-bit ints using the plain encoding"""
+ length = 4 * count
+ data = fo.read(length)
+ if len(data) != length:
+ raise EOFError("Expected {} bytes but got {} bytes".format(length, len(data)))
+ res = struct.unpack("<{}i".format(count), data)
+ return res
+
+
+def read_plain_int64(fo, count):
+ """Reads `count` 64-bit ints using the plain encoding"""
+ return struct.unpack("<{}q".format(count), fo.read(8 * count))
+
+
+def read_plain_int96(fo, count):
+ """Reads `count` 96-bit ints using the plain encoding"""
+ items = struct.unpack("<qi" * count, fo.read(12) * count)
+ args = [iter(items)] * 2
+ return [q << 32 | i for (q, i) in zip(*args)]
+
+
+def read_plain_float(fo, count):
+ """Reads `count` 32-bit floats using the plain encoding"""
+ return struct.unpack("<{}f".format(count), fo.read(4 * count))
+
+
+def read_plain_double(fo, count):
+ """Reads `count` 64-bit float (double) using the plain encoding"""
+ return struct.unpack("<{}d".format(count), fo.read(8 * count))
+
+
+def read_plain_byte_array(fo, count):
+ """Read `count` byte arrays using the plain encoding"""
+ return [fo.read(struct.unpack("<i", fo.read(4))[0]) for i in range(count)]
+
+
+def read_plain_byte_array_fixed(fo, fixed_length):
+ """Reads a byte array of the given fixed_length"""
+ return fo.read(fixed_length)
+
+
+DECODE_PLAIN = {
+ parquet_thrift.Type.BOOLEAN: read_plain_boolean,
+ parquet_thrift.Type.INT32: read_plain_int32,
+ parquet_thrift.Type.INT64: read_plain_int64,
+ parquet_thrift.Type.INT96: read_plain_int96,
+ parquet_thrift.Type.FLOAT: read_plain_float,
+ parquet_thrift.Type.DOUBLE: read_plain_double,
+ parquet_thrift.Type.BYTE_ARRAY: read_plain_byte_array,
+ parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY: read_plain_byte_array_fixed
+}
+
+
+def read_plain(fo, type_, count):
+ """Reads `count` items `type` from the fo using the plain encoding."""
+ if count == 0:
+ return []
+ conv = DECODE_PLAIN[type_]
+ return conv(fo, count)
+
+
+def read_unsigned_var_int(fo):
+ result = 0
+ shift = 0
+ while True:
+ byte = struct.unpack("<B", fo.read(1))[0]
+ result |= ((byte & 0x7F) << shift)
+ if (byte & 0x80) == 0:
+ break
+ shift += 7
+ return result
+
+
+def read_rle(fo, header, bit_width, debug_logging):
+ """Read a run-length encoded run from the given fo with the given header
+ and bit_width.
+
+ The count is determined from the header and the width is used to grab the
+ value that's repeated. Yields the value repeated count times.
+ """
+ count = header >> 1
+ zero_data = b"\x00\x00\x00\x00"
+ width = (bit_width + 7) // 8
+ data = fo.read(width)
+ data = data + zero_data[len(data):]
+ value = struct.unpack("<i", data)[0]
+ if debug_logging:
+ logger.debug("Read RLE group with value %s of byte-width %s and count %s",
+ value, width, count)
+ for i in range(count):
+ yield value
+
+
+def width_from_max_int(value):
+ """Converts the value specified to a bit_width."""
+ return int(math.ceil(math.log(value + 1, 2)))
+
+
+def _mask_for_bits(i):
+ """Helper function for read_bitpacked to generage a mask to grab i bits."""
+ return (1 << i) - 1
+
+
+def read_bitpacked(fo, header, width, debug_logging):
+ """Reads a bitpacked run of the rle/bitpack hybrid.
+
+ Supports width >8 (crossing bytes).
+ """
+ num_groups = header >> 1
+ count = num_groups * 8
+ byte_count = (width * count) // 8
+ if debug_logging:
+ logger.debug("Reading a bit-packed run with: %s groups, count %s, bytes %s",
+ num_groups, count, byte_count)
+ raw_bytes = array.array(str('B'), fo.read(byte_count)).tolist()
+ current_byte = 0
+ b = raw_bytes[current_byte]
+ mask = _mask_for_bits(width)
+ bits_wnd_l = 8
+ bits_wnd_r = 0
+ res = []
+ total = len(raw_bytes)*8;
+ while (total >= width):
+ # TODO zero-padding could produce extra zero-values
+ if debug_logging:
+ logger.debug(" read bitpacked: width=%s window=(%s %s) b=%s,"
+ " current_byte=%s",
+ width, bits_wnd_l, bits_wnd_r, bin(b), current_byte)
+ if bits_wnd_r >= 8:
+ bits_wnd_r -= 8
+ bits_wnd_l -= 8
+ b >>= 8
+ elif bits_wnd_l - bits_wnd_r >= width:
+ res.append((b >> bits_wnd_r) & mask)
+ total -= width
+ bits_wnd_r += width
+ if debug_logging:
+ logger.debug(" read bitpackage: added: %s", res[-1])
+ elif current_byte + 1 < len(raw_bytes):
+ current_byte += 1
+ b |= (raw_bytes[current_byte] << bits_wnd_l)
+ bits_wnd_l += 8
+ return res
+
+
+def read_bitpacked_deprecated(fo, byte_count, count, width, debug_logging):
+ raw_bytes = array.array(str('B'), fo.read(byte_count)).tolist()
... 1116 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-parquet.git
More information about the Python-modules-commits
mailing list