[Python-modules-commits] [partd] 02/04: New upstream version 0.3.8
Diane Trout
diane at moszumanska.debian.org
Fri Jun 2 05:31:58 UTC 2017
This is an automated email from the git hooks/post-receive script.
diane pushed a commit to branch master
in repository partd.
commit 900155ee92ff61a8ad83401cd384b2eff2261ea0
Author: Diane Trout <diane at ghic.org>
Date: Thu Jun 1 20:25:07 2017 -0700
New upstream version 0.3.8
---
LICENSE.txt | 28 ++++++++
MANIFEST.in | 1 +
PKG-INFO | 2 +-
partd.egg-info/PKG-INFO | 2 +-
partd.egg-info/SOURCES.txt | 1 +
partd/__init__.py | 2 +-
partd/file.py | 22 +++++--
partd/numpy.py | 50 ++++++---------
partd/pandas.py | 155 ++++++++++++++++++++++++++-------------------
partd/tests/test_file.py | 5 ++
partd/tests/test_numpy.py | 27 ++++----
partd/tests/test_pandas.py | 43 +++++++++++--
setup.py | 2 +-
13 files changed, 216 insertions(+), 124 deletions(-)
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..990fa35
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,28 @@
+Copyright (c) 2015, Continuum Analytics, Inc. and contributors
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+Neither the name of Continuum Analytics nor the names of any contributors
+may be used to endorse or promote products derived from this software
+without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/MANIFEST.in b/MANIFEST.in
index e6971f4..673a2f4 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -3,4 +3,5 @@ recursive-include partd *.py
include requirements.txt
include setup.py
include README.rst
+include LICENSE.txt
include MANIFEST.in
diff --git a/PKG-INFO b/PKG-INFO
index ffc22f6..5f711b3 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.0
Name: partd
-Version: 0.3.7
+Version: 0.3.8
Summary: Appendable key-value storage
Home-page: http://github.com/dask/partd/
Author: Matthew Rocklin
diff --git a/partd.egg-info/PKG-INFO b/partd.egg-info/PKG-INFO
index ffc22f6..5f711b3 100644
--- a/partd.egg-info/PKG-INFO
+++ b/partd.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.0
Name: partd
-Version: 0.3.7
+Version: 0.3.8
Summary: Appendable key-value storage
Home-page: http://github.com/dask/partd/
Author: Matthew Rocklin
diff --git a/partd.egg-info/SOURCES.txt b/partd.egg-info/SOURCES.txt
index 42674af..98ea681 100644
--- a/partd.egg-info/SOURCES.txt
+++ b/partd.egg-info/SOURCES.txt
@@ -1,3 +1,4 @@
+LICENSE.txt
MANIFEST.in
README.rst
requirements.txt
diff --git a/partd/__init__.py b/partd/__init__.py
index bb7ccd8..9cccce5 100644
--- a/partd/__init__.py
+++ b/partd/__init__.py
@@ -16,4 +16,4 @@ with ignoring(ImportError):
from .zmq import Client, Server
-__version__ = '0.3.7'
+__version__ = '0.3.8'
diff --git a/partd/file.py b/partd/file.py
index c03df8a..5b814d2 100644
--- a/partd/file.py
+++ b/partd/file.py
@@ -1,18 +1,21 @@
from __future__ import absolute_import
-from .core import Interface
-import locket
-import tempfile
+import atexit
import os
import shutil
import string
+import tempfile
+
+from .core import Interface
+import locket
from .utils import ignoring
class File(Interface):
- def __init__(self, path=None):
+ def __init__(self, path=None, dir=None):
if not path:
- path = tempfile.mkdtemp('.partd')
+ path = tempfile.mkdtemp(suffix='.partd', dir=dir)
+ cleanup_files.append(path)
self._explicitly_given_path = False
else:
self._explicitly_given_path = True
@@ -139,3 +142,12 @@ def token(key):
return os.path.join(*map(token, key))
else:
return str(key)
+
+
+cleanup_files = list()
+
+ at atexit.register
+def cleanup():
+ for fn in cleanup_files:
+ if os.path.exists(fn):
+ shutil.rmtree(fn)
diff --git a/partd/numpy.py b/partd/numpy.py
index df16259..ff6fc3c 100644
--- a/partd/numpy.py
+++ b/partd/numpy.py
@@ -6,10 +6,13 @@ description of the array's dtype.
"""
from __future__ import absolute_import
import numpy as np
-from toolz import valmap, concat, identity, partial
-from .compatibility import pickle, unicode
+from toolz import valmap, identity, partial
+from .compatibility import pickle
+from .core import Interface
+from .file import File
from .utils import frame, framesplit, suffix, ignoring
+
def serialize_dtype(dt):
""" Serialize dtype to bytes
@@ -36,10 +39,6 @@ def parse_dtype(s):
return np.dtype(s)
-from .core import Interface
-from .file import File
-
-
class Numpy(Interface):
def __init__(self, partd=None):
if not partd or isinstance(partd, str):
@@ -94,39 +93,28 @@ except ImportError:
def serialize(x):
if x.dtype == 'O':
+ l = x.flatten().tolist()
with ignoring(Exception): # Try msgpack (faster on strings)
- return frame(msgpack.packb(x.tolist()))
- return frame(pickle.dumps(x.tolist(), protocol=pickle.HIGHEST_PROTOCOL))
+ return frame(msgpack.packb(l, use_bin_type=True))
+ return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
else:
return x.tobytes()
-def decode(o):
- if isinstance(o, list):
- if not o:
- return []
- elif isinstance(o[0], bytes):
- try:
- return [item.decode() for item in o]
- except AttributeError:
- return list(map(decode, o))
- else:
- return list(map(decode, o))
- elif isinstance(o, bytes):
- return o.decode()
- else:
- return o
-
def deserialize(bytes, dtype, copy=False):
if dtype == 'O':
try:
- l = list(concat(map(msgpack.unpackb, framesplit(bytes))))
- except:
- l = list(concat(map(pickle.loads, framesplit(bytes))))
-
- l = decode(l)
-
- return np.array(l, dtype='O')
+ blocks = [msgpack.unpackb(f, encoding='utf-8')
+ for f in framesplit(bytes)]
+ except Exception:
+ blocks = [pickle.loads(f) for f in framesplit(bytes)]
+
+ result = np.empty(sum(map(len, blocks)), dtype='O')
+ i = 0
+ for block in blocks:
+ result[i:i + len(block)] = block
+ i += len(block)
+ return result
else:
result = np.frombuffer(bytes, dtype)
if copy:
diff --git a/partd/pandas.py b/partd/pandas.py
index a919e22..b7c7aea 100644
--- a/partd/pandas.py
+++ b/partd/pandas.py
@@ -1,12 +1,16 @@
from __future__ import absolute_import
-import pandas as pd
-from toolz import valmap
from functools import partial
-from .compatibility import pickle
-from .numpy import Numpy
+
+import numpy as np
+import pandas as pd
+from pandas.core.internals import create_block_manager_from_blocks, make_block
+
+from . import numpy as pnp
from .core import Interface
-from .utils import extend
+from .compatibility import pickle
+from .encode import Encode
+from .utils import extend, framesplit, frame
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
@@ -14,7 +18,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
class PandasColumns(Interface):
def __init__(self, partd=None):
- self.partd = Numpy(partd)
+ self.partd = pnp.Numpy(partd)
Interface.__init__(self)
def append(self, data, **kwargs):
@@ -73,84 +77,103 @@ class PandasColumns(Interface):
self.partd.__del__()
+def index_to_header_bytes(ind):
+ # These have special `__reduce__` methods, just use pickle
+ if isinstance(ind, (pd.DatetimeIndex,
+ pd.MultiIndex,
+ pd.RangeIndex)):
+ return None, dumps(ind)
-import pandas as pd
-from pandas.core.internals import create_block_manager_from_blocks, make_block
-from pandas.core.index import _ensure_index
-
+ if isinstance(ind, pd.CategoricalIndex):
+ cat = (ind.ordered, ind.categories)
+ values = ind.codes
+ else:
+ cat = None
+ values = ind.values
+
+ header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
+ bytes = pnp.compress(pnp.serialize(values), values.dtype)
+ return header, bytes
+
+
+def index_from_header_bytes(header, bytes):
+ if header is None:
+ return pickle.loads(bytes)
+
+ typ, attr, dtype, cat = header
+ data = pnp.deserialize(pnp.decompress(bytes, dtype), dtype, copy=True)
+ if cat:
+ data = pd.Categorical.from_codes(data, cat[1], ordered=cat[0])
+ return typ.__new__(typ, data=data, **attr)
+
+
+def block_to_header_bytes(block):
+ values = block.values
+ try:
+ # pandas >= 0.19
+ from pandas.api.types import is_datetime64tz_dtype
+ except ImportError:
+ from pandas.core.common import is_datetime64tz_dtype
+
+ if isinstance(values, pd.Categorical):
+ extension = ('categorical_type', (values.ordered, values.categories))
+ values = values.codes
+ elif is_datetime64tz_dtype(block):
+ # TODO: compat with older pandas?
+ extension = ('datetime64_tz_type', (block.values.tzinfo,))
+ values = np.asarray(values)
+ else:
+ extension = ('numpy_type', ())
-def to_blocks(df):
- blocks = [block.values for block in df._data.blocks]
- index = df.index.values
- placement = [ b.mgr_locs.as_array for b in df._data.blocks ]
- return blocks, index, df.index.name, list(df.columns), placement
- return {'blocks': blocks,
- 'index': index,
- 'index_name': df.index.name,
- 'columns': df.columns,
- 'placement': [ b.mgr_locs.as_array for b in df._data.blocks ]}
+ header = (block.mgr_locs.as_array, values.dtype, values.shape, extension)
+ bytes = pnp.compress(pnp.serialize(values), values.dtype)
+ return header, bytes
-def from_blocks(blocks, index, index_name, columns, placement):
- blocks = [ make_block(b, placement=placement[i]) for i, b in enumerate(blocks) ]
- axes = [_ensure_index(columns), _ensure_index(index) ]
- df = pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
- df.index.name = index_name
- return df
+def block_from_header_bytes(header, bytes):
+ placement, dtype, shape, (extension_type, extension_values) = header
+ values = pnp.deserialize(pnp.decompress(bytes, dtype), dtype,
+ copy=True).reshape(shape)
+ if extension_type == 'categorical_type':
+ values = pd.Categorical.from_codes(values,
+ extension_values[1],
+ ordered=extension_values[0])
+ elif extension_type == 'datetime64_tz_type':
+ tz_info = extension_values[0]
+ values = pd.DatetimeIndex(values).tz_localize('utc').tz_convert(
+ tz_info)
+ return make_block(values, placement=placement)
-from . import numpy as pnp
-from .utils import framesplit, frame
def serialize(df):
""" Serialize and compress a Pandas DataFrame
Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes
"""
- blocks, index, index_name, columns, placement = to_blocks(df)
- categories = [(b.ordered, b.categories)
- if isinstance(b, pd.Categorical)
- else None
- for b in blocks]
- blocks = [b.codes if isinstance(b, pd.Categorical) else b
- for b in blocks]
- b_blocks = [pnp.compress(pnp.serialize(block), block.dtype)
- for block in blocks] # this can be slightly faster if we merge both operations
- b_index = pnp.compress(pnp.serialize(index), index.dtype)
- frames = [dumps(index_name),
- dumps(columns),
- dumps(placement),
- dumps(index.dtype),
- b_index,
- dumps([block.dtype for block in blocks]),
- dumps([block.shape for block in blocks]),
- dumps(categories)] + b_blocks
+ col_header, col_bytes = index_to_header_bytes(df.columns)
+ ind_header, ind_bytes = index_to_header_bytes(df.index)
+ headers = [col_header, ind_header]
+ bytes = [col_bytes, ind_bytes]
+ for block in df._data.blocks:
+ h, b = block_to_header_bytes(block)
+ headers.append(h)
+ bytes.append(b)
+
+ frames = [dumps(headers)] + bytes
return b''.join(map(frame, frames))
def deserialize(bytes):
""" Deserialize and decompress bytes back to a pandas DataFrame """
frames = list(framesplit(bytes))
- index_name = pickle.loads(frames[0])
- columns = pickle.loads(frames[1])
- placement = pickle.loads(frames[2])
- dt = pickle.loads(frames[3])
- index = pnp.deserialize(pnp.decompress(frames[4], dt), dt, copy=True)
- dtypes = pickle.loads(frames[5])
- shapes = pickle.loads(frames[6])
- categories = pickle.loads(frames[7])
- b_blocks = frames[8:]
- blocks = [pnp.deserialize(pnp.decompress(block, dt), dt, copy=True).reshape(shape)
- for block, dt, shape in zip(b_blocks, dtypes, shapes)]
- blocks = [pd.Categorical.from_codes(b, cat[1], ordered=cat[0])
- if cat is not None
- else b
- for cat, b in zip(categories, blocks)]
-
- return from_blocks(blocks, index, index_name, columns, placement)
-
-
-from .encode import Encode
+ headers = pickle.loads(frames[0])
+ bytes = frames[1:]
+ axes = [index_from_header_bytes(headers[0], bytes[0]),
+ index_from_header_bytes(headers[1], bytes[1])]
+ blocks = [block_from_header_bytes(h, b)
+ for (h, b) in zip(headers[2:], bytes[2:])]
+ return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
def join(dfs):
diff --git a/partd/tests/test_file.py b/partd/tests/test_file.py
index 4c4dbd1..2dbcb79 100644
--- a/partd/tests/test_file.py
+++ b/partd/tests/test_file.py
@@ -73,3 +73,8 @@ def test_del():
with File('Foo') as p:
p.__del__()
assert os.path.exists(p.path)
+
+
+def test_specify_dirname():
+ with File(dir=os.getcwd()) as f:
+ assert os.getcwd() in f.path
diff --git a/partd/tests/test_numpy.py b/partd/tests/test_numpy.py
index 2cbfd58..fa8eec2 100644
--- a/partd/tests/test_numpy.py
+++ b/partd/tests/test_numpy.py
@@ -1,14 +1,13 @@
from __future__ import absolute_import
import pytest
-pytest.importorskip('numpy')
+np = pytest.importorskip('numpy') # noqa
-import numpy as np
-import os
-import shutil
import pickle
-from partd.numpy import Numpy, decode
+import partd
+from partd.numpy import Numpy
+
def test_numpy():
dt = np.dtype([('a', 'i4'), ('b', 'i2'), ('c', 'f8')])
@@ -43,8 +42,13 @@ def test_serialization():
assert (q.get('x') == [1, 2, 3]).all()
-def test_object_dtype():
- x = np.array(['Alice', 'Bob', 'Charlie'], dtype='O')
+array_of_lists = np.empty(3, dtype='O')
+array_of_lists[:] = [[1, 2], [3, 4], [5, 6]]
+
+
+ at pytest.mark.parametrize('x', [np.array(['Alice', 'Bob', 'Charlie'], dtype='O'),
+ array_of_lists])
+def test_object_dtype(x):
with Numpy() as p:
p.append({'x': x})
p.append({'x': x})
@@ -61,7 +65,8 @@ def test_datetime_types():
assert p.get('y').dtype == y.dtype
-def test_decode():
- assert decode([]) == []
- assert decode(np.nan) is np.nan
- assert decode([b'a', 1]) == ['a', 1]
+def test_non_utf8_bytes():
+ a = np.array([b'\xc3\x28', b'\xa0\xa1', b'\xe2\x28\xa1', b'\xe2\x82\x28',
+ b'\xf0\x28\x8c\xbc'], dtype='O')
+ s = partd.numpy.serialize(a)
+ assert (partd.numpy.deserialize(s, 'O') == a).all()
diff --git a/partd/tests/test_pandas.py b/partd/tests/test_pandas.py
index ac840cf..d8d4323 100644
--- a/partd/tests/test_pandas.py
+++ b/partd/tests/test_pandas.py
@@ -1,12 +1,12 @@
from __future__ import absolute_import
import pytest
-pytest.importorskip('pandas')
+pytest.importorskip('pandas') # noqa
+import numpy as np
import pandas as pd
import pandas.util.testing as tm
import os
-import shutil
from partd.pandas import PandasColumns, PandasBlocks, serialize, deserialize
@@ -43,7 +43,6 @@ def test_PandasColumns():
assert not os.path.exists(p.partd.partd.path)
-
def test_column_selection():
with PandasColumns('foo') as p:
p.append({'x': df1, 'y': df2})
@@ -73,10 +72,40 @@ def test_PandasBlocks():
@pytest.mark.parametrize('ordered', [False, True])
def test_serialize_categoricals(ordered):
- df = pd.DataFrame({'x': [1, 2, 3, 4],
- 'y': pd.Categorical(['c', 'a', 'b', 'a'],
- ordered=ordered)})
+ frame = pd.DataFrame({'x': [1, 2, 3, 4],
+ 'y': pd.Categorical(['c', 'a', 'b', 'a'],
+ ordered=ordered)},
+ index=pd.Categorical(['x', 'y', 'z', 'x'],
+ ordered=ordered))
+ frame.index.name = 'foo'
+ frame.columns.name = 'bar'
+
+ for ind, df in [(0, frame), (1, frame.T)]:
+ df2 = deserialize(serialize(df))
+ tm.assert_frame_equal(df, df2)
+
+
+def test_serialize_multi_index():
+ df = pd.DataFrame({'x': ['a', 'b', 'c', 'a', 'b', 'c'],
+ 'y': [1, 2, 3, 4, 5, 6],
+ 'z': [7., 8, 9, 10, 11, 12]})
+ df = df.groupby([df.x, df.y]).sum()
+ df.index.name = 'foo'
+ df.columns.name = 'bar'
+
+ df2 = deserialize(serialize(df))
+ tm.assert_frame_equal(df, df2)
+
+ at pytest.mark.parametrize('base', [
+ pd.Timestamp('1987-03-3T01:01:01+0001'),
+ pd.Timestamp('1987-03-03 01:01:01-0600', tz='US/Central'),
+])
+def test_serialize(base):
+ df = pd.DataFrame({'x': [
+ base + pd.Timedelta(seconds=i)
+ for i in np.random.randint(0, 1000, size=10)],
+ 'y': list(range(10)),
+ 'z': pd.date_range('2017', periods=10)})
df2 = deserialize(serialize(df))
tm.assert_frame_equal(df, df2)
- assert df.y.cat.ordered == df2.y.cat.ordered
diff --git a/setup.py b/setup.py
index 4301711..0e73810 100755
--- a/setup.py
+++ b/setup.py
@@ -4,7 +4,7 @@ from os.path import exists
from setuptools import setup
setup(name='partd',
- version='0.3.7',
+ version='0.3.8',
description='Appendable key-value storage',
url='http://github.com/dask/partd/',
maintainer='Matthew Rocklin',
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/partd.git
More information about the Python-modules-commits
mailing list