[med-svn] [Git][med-team/intake][upstream] New upstream version 0.6.5
Andreas Tille (@tille)
gitlab at salsa.debian.org
Sun Jan 16 21:08:01 GMT 2022
Andreas Tille pushed to branch upstream at Debian Med / intake
Commits:
1a421721 by Andreas Tille at 2022-01-16T21:04:07+01:00
New upstream version 0.6.5
- - - - -
29 changed files:
- .github/workflows/main.yaml
- docs/source/api_user.rst
- docs/source/catalog.rst
- docs/source/conf.py
- docs/source/plugin-directory.rst
- intake/_version.py
- intake/catalog/base.py
- intake/catalog/local.py
- intake/catalog/remote.py
- intake/catalog/tests/test_local.py
- intake/catalog/tests/test_parameters.py
- intake/catalog/tests/test_reload_integration.py
- intake/catalog/tests/test_utils.py
- intake/catalog/utils.py
- intake/cli/client/subcommands/drivers.py
- intake/conftest.py
- intake/source/discovery.py
- + intake/source/jsonfiles.py
- intake/source/tests/test_discovery.py
- + intake/source/tests/test_json.py
- + intake/source/tests/test_tiled.py
- + intake/source/tiled.py
- intake/tests/catalog_inherit_params.yml
- intake/tests/catalog_nested_sub.yml
- requirements.txt
- scripts/ci/environment-py37.yml
- scripts/ci/environment-py39.yml
- setup.py
- versioneer.py
Changes:
=====================================
.github/workflows/main.yaml
=====================================
@@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- CONDA_ENV: [py37, py38, pip]
+ CONDA_ENV: [py37, py38, py39, pip]
steps:
- name: Checkout
uses: actions/checkout at v2
=====================================
docs/source/api_user.rst
=====================================
@@ -12,6 +12,8 @@ These are reference class and function definitions likely to be useful to everyo
intake.open_
intake.source.csv.CSVSource
intake.source.textfiles.TextFilesSource
+ intake.source.jsonfiles.JSONFileSource
+ intake.source.jsonfiles.JSONLinesFileSource
intake.source.npy.NPySource
intake.source.zarr.ZarrArraySource
intake.catalog.local.YAMLFileCatalog
@@ -51,6 +53,12 @@ Source classes
.. autoclass:: intake.source.textfiles.TextFilesSource
:members: __init__, discover, read_partition, read, to_dask, persist, export
+.. autoclass:: intake.source.jsonfiles.JSONFileSource
+ :members: __init__, discover, read, persist, export
+
+.. autoclass:: intake.source.jsonfiles.JSONLinesFileSource
+ :members: __init__, discover, read, head, persist, export
+
.. autoclass:: intake.source.npy.NPySource
:members: __init__, discover, read_partition, read, to_dask, persist, export
=====================================
docs/source/catalog.rst
=====================================
@@ -293,6 +293,9 @@ resolve to ``"http://server:port/user/"``
Parameter Definition
--------------------
+Source parameters
+'''''''''''''''''
+
A source definition can contain a "parameters" block.
Expressed in YAML, a parameter may look as follows:
@@ -301,7 +304,7 @@ Expressed in YAML, a parameter may look as follows:
parameters:
name:
description: name to use # human-readable text for what this parameter means
- type: str # one of bool, str, int, float, list[str], list[int], list[float], datetime
+ type: str # one of bool, str, int, float, list[str | int | float], datetime, mlist
default: normal # optional, value to assume if user does not override
allowed: ["normal", "strange"] # optional, list of values that are OK, for validation
min: "n" # optional, minimum allowed, for validation
@@ -322,9 +325,62 @@ can have one of two uses:
for example, an integer. It makes no sense to provide a default for this case (the argument already has a value),
but providing a default will not raise an exception.
+- the "mlist" type is special: it means that the input must be a list, whose values are chosen from the
+ allowed list. This is the only type where the parameter value is not the same type as the allowed list's
+ values, e.g., if a list of str is set for ``allowed``, a list of str must also be the final value.
+
Note: the ``datetime`` type accepts multiple values:
Python datetime, ISO8601 string, Unix timestamp int, "now" and "today".
+Catalog parameters
+''''''''''''''''''
+
+You can also define user parameters at the catalog level. This applies the parameter to
+all entries within that catalog, without having to define it for each and every entry.
+Furthermore, catalogs dested within the catalog will also inherit the parameter(s).
+
+For example, with the following spec
+
+.. code-block:: yaml
+
+ metadata:
+ version: 1
+ parameters:
+ bucket:
+ type: str
+ description: description
+ default: test_bucket
+ sources:
+ param_source:
+ driver: parquet
+ description: description
+ args:
+ urlpath: s3://{{bucket}}/file.parquet
+ subcat:
+ driver: yaml_file
+ path: "{{CATALOG_DIR}}/other.yaml"
+
+If ``cat`` is the corresponsing catalog instance,
+the URL of source ``cat.param_source`` will evaluate to "s3://test_bucket/file.parquet" by default, but
+the parameter can be overridden with ``cat.param_source(bucket="other_bucket")``. Also, any
+entries of ``subcat``, another catalog referenced from here, would also have the "bucket"-named
+parameter attached to all sources. Of course, those sources do no need to make use of the
+parameter.
+
+To change the default, we can gerenate a new instance
+
+.. code-block:: python
+
+ cat2 = cat(bucket="production") # sets default value of "bucket" for cat2
+ subcat = cat.subcat(bucket="production") # sets default only for the nested catalog
+
+Of course, in these situations you can still override the value of the parameter for any
+source, or pass explicit values for the arguments of the source, as normal.
+
+For cases where the catalog is not defined in a YAML spec, the argument ``user_parameters``
+to the constructor takes the same form as ``parameters`` above: a dict of user parameters,
+either as ``UserParameter`` instances or as a dictionary spec for each one.
+
Driver Selection
----------------
=====================================
docs/source/conf.py
=====================================
@@ -182,6 +182,3 @@ intersphinx_mapping = {'https://docs.python.org/': None}
numpydoc_show_class_members = False
numpydoc_show_inherited_class_members = False
numpydoc_class_members_toctree = False
-
-def setup(app):
- app.add_stylesheet('css/custom.css')
=====================================
docs/source/plugin-directory.rst
=====================================
@@ -7,7 +7,8 @@ This is a list of known projects which install driver plugins for Intake, and th
contains in parentheses:
* builtin to Intake (``catalog``, ``csv``, ``intake_remote``, ``ndzarr``,
- ``numpy``, ``textfiles``, ``yaml_file_cat``, ``yaml_files_cat``, ``zarr_cat``)
+ ``numpy``, ``textfiles``, ``yaml_file_cat``, ``yaml_files_cat``, ``zarr_cat``,
+ ``json``, ``jsonl``)
* `intake-astro <https://github.com/intake/intake-astro>`_ Table and array loading of FITS astronomical data (``fits_array``, ``fits_table``)
* `intake-accumulo <https://github.com/intake/intake-accumulo>`_ Apache Accumulo clustered data storage (``accumulo``)
* `intake-avro <https://github.com/intake/intake-avro>`_: Apache Avro data serialization format (``avro_table``, ``avro_sequence``)
@@ -17,6 +18,7 @@ contains in parentheses:
* `intake-elasticsearch <https://github.com/intake/intake-elasticsearch>`_: Elasticsearch search and analytics engine (``elasticsearch_seq``, ``elasticsearch_table``)
* `intake-esm <https://github.com/NCAR/intake-esm>`_: Plugin for building and loading intake catalogs for earth system data sets holdings, such as `CMIP <https://cmip.llnl.gov/>`_ (Coupled Model Intercomparison Project) and CESM Large Ensemble datasets.
* `intake-geopandas <https://github.com/informatics-lab/intake_geopandas>`_: load from ESRI Shape Files, GeoJSON, and geospatial databases with geopandas (``geojson``, ``postgis``, ``shapefile``, ``spatialite``) and ``regionmask`` for opening shapefiles into `regionmask <https://github.com/mathause/regionmask/>`_.
+* `intake-google-analytics <https://github.com/intake/intake-google-analytics>`_: run Google Analytics queries and load data as a DataFrame (``google_analytics_query``)
* `intake-hbase <https://github.com/intake/intake-hbase>`_: Apache HBase database (``hbase``)
* `intake-iris <https://github.com/informatics-lab/intake-iris>`_ load netCDF and GRIB files with IRIS (``grib``, ``netcdf``)
* `intake-metabase <https://github.com/continuumio/intake-metabase>`_: Generate catalogs and load tables as DataFrames from Metabase (``metabase_catalog``, ``metabase_table``)
=====================================
intake/_version.py
=====================================
@@ -24,9 +24,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (HEAD -> master, tag: 0.6.4)"
- git_full = "d9faa048bbc09d74bb6972f672155e3814c3ca62"
- git_date = "2021-10-08 10:34:28 -0400"
+ git_refnames = " (HEAD -> master, tag: 0.6.5)"
+ git_full = "bc1d43524f9f0c38ccc5285d1f3fa02c37339372"
+ git_date = "2022-01-09 17:32:52 -0500"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
=====================================
intake/catalog/base.py
=====================================
@@ -42,7 +42,7 @@ class Catalog(DataSource):
def __init__(self, entries=None, name=None, description=None, metadata=None,
ttl=60, getenv=True, getshell=True,
- persist_mode='default', storage_options=None):
+ persist_mode='default', storage_options=None, user_parameters=None):
"""
Parameters
----------
@@ -83,6 +83,15 @@ class Catalog(DataSource):
self.getenv = getenv
self.getshell = getshell
self.storage_options = storage_options
+ if isinstance(user_parameters, dict) and user_parameters:
+ from .local import UserParameter
+ self.user_parameters = {name: (UserParameter(name=name, **up) if isinstance(up, dict)
+ else up)
+ for name, up in user_parameters.items()}
+ elif isinstance(user_parameters, (list, tuple)):
+ self.user_parameters = {up["name"]: up for up in user_parameters}
+ else:
+ self.user_parameters = {}
if persist_mode not in ['always', 'never', 'default']:
# should be True, False, None ?
raise ValueError('Persist mode (%s) not understood' % persist_mode)
@@ -181,7 +190,9 @@ class Catalog(DataSource):
getenv=self.getenv,
getshell=self.getshell,
metadata=(self.metadata or {}).copy(),
- storage_options=self.storage_options)
+ storage_options=self.storage_options,
+ user_parameters=self.user_parameters.copy()
+ )
cat.metadata['search'] = {'text': text, 'upstream': self.name}
cat.cat = self
for e in entries.values():
@@ -304,8 +315,38 @@ class Catalog(DataSource):
entry = self._entries[name]
entry._catalog = self
entry._pmode = self.pmode
+
+ up_names = set((up["name"] if isinstance(up, dict) else up.name)
+ for up in entry._user_parameters)
+ ups = [up for name, up in self.user_parameters.items() if name not in up_names]
+ entry._user_parameters = ups + (entry._user_parameters or [])
return entry()
+ def configure_new(self, **kwargs):
+ from .local import UserParameter
+ ups = {}
+ for k, v in kwargs.copy().items():
+ for up in self.user_parameters.values():
+ if isinstance(up, dict):
+ if k == up["name"]:
+ kw = up.copy()
+ kw['default'] = v
+ ups[k] = UserParameter(**kw)
+ kwargs.pop(k)
+ else:
+ if k == up.name:
+ kw = up._captured_init_kwargs.copy()
+ kw['default'] = v
+ kw["name"] = k
+ ups[k] = UserParameter(**kw)
+ kwargs.pop(k)
+
+ new = super().configure_new(**kwargs)
+ new.user_parameters.update(ups)
+ return new
+
+ __call__ = get = configure_new
+
@reload_on_change
def _get_entries(self):
return self._entries
@@ -390,14 +431,14 @@ class Catalog(DataSource):
nested directories with cat.name1.name2, cat['name1.name2'] *or*
cat['name1', 'name2']
"""
- if not isinstance(key, list) and key in self._get_entries():
+ if not isinstance(key, list) and key in self:
# triggers reload_on_change
- e = self._entries[key]
- e._catalog = self
- e._pmode = self.pmode
- if e.container == 'catalog':
- return e(name=key)
- return e()
+ s = self._get_entry(key)
+ if s.container == 'catalog':
+ s.name = key
+ s.user_parameters.update(self.user_parameters.copy())
+ return s
+ return s
if isinstance(key, str) and '.' in key:
key = key.split('.')
if isinstance(key, list):
=====================================
intake/catalog/local.py
=====================================
@@ -42,7 +42,7 @@ class UserParameter(DictSerialiseMixin):
description: str
narrative text
type: str
- one of list``(COERSION_RULES)``
+ one of list ``(COERSION_RULES)``
default: type value
same type as ``type``. It a str, may include special functions
env, shell, client_env, client_shell.
@@ -55,7 +55,7 @@ class UserParameter(DictSerialiseMixin):
min=None, max=None, allowed=None):
self.name = name
self.description = description
- self.type = type
+ self.type = type or __builtins__["type"](default).__name__
self.min = min
self.max = max
self.allowed = allowed
@@ -691,28 +691,14 @@ class YAMLFileCatalog(Catalog):
cfg = result.data
self._entries = {}
- try:
- shared_parameters = data["metadata"]["parameters"]
- params = [
- UserParameter(name, **attrs)
- for name, attrs in shared_parameters.items()
- ]
- except KeyError:
- params = None
+ shared_parameters = data.get("metadata", {}).get("parameters", {})
+ self.user_parameters.update({
+ name: UserParameter(name, **attrs)
+ for name, attrs in shared_parameters.items()
+ })
for entry in cfg['data_sources']:
entry._catalog = self
- if params is not None:
- try:
- # Note that putting the entry parameters after the global parameters
- # means that local parameters will overwrite any inherited parameters
- all_params = params + entry._user_parameters
- except AttributeError:
- all_params = params
-
- entry.__setattr__("_user_parameters", all_params)
- # Necessary for a copy of the entry to have the same parameters
- entry._captured_init_kwargs["parameters"] = all_params
self._entries[entry.name] = entry
entry._filesystem = self.filesystem
@@ -793,11 +779,14 @@ class YAMLFilesCatalog(Catalog):
if f.path not in self._cats:
entry = LocalCatalogEntry(name, "YAML file: %s" % name,
'yaml_file_cat', True,
- kwargs, [], {}, self.metadata, d)
+ kwargs, [], [], self.metadata, d)
if self._flatten:
# store a concrete Catalog
try:
- self._cats[f.path] = entry()
+ cat = entry()
+ cat.reload()
+ self.user_parameters.update(cat.user_parameters)
+ self._cats[f.path] = cat
except IOError as e:
logger.info('Loading "%s" as a catalog failed: %s'
'' % (entry, e))
@@ -817,10 +806,6 @@ class YAMLFilesCatalog(Catalog):
entries.update(entry._entries)
else:
entries[entry._name] = entry
- # This is thread safe as long as the hash values for entries are
- # computed in C --- i.e. if they are built-in types like strings. If
- # they implement __hash__ in Python then this is not threadsafe, and it
- # would need a lock.
self._entries.update(entries)
=====================================
intake/catalog/remote.py
=====================================
@@ -156,6 +156,7 @@ class RemoteCatalog(Catalog):
auth=self.auth,
http_args=self.http_args,
page_size=self._page_size,
+ persist_mode=self.pmode,
# user_parameters=user_parameters,
**source)
return page
@@ -181,6 +182,7 @@ class RemoteCatalog(Catalog):
auth=self.auth,
http_args=self.http_args,
page_size=self._page_size,
+ persist_mode=self.pmode,
**info['source'])
def _get_http_args(self, params):
@@ -274,6 +276,7 @@ class RemoteCatalog(Catalog):
url=self.url,
http_args=self.http_args,
source_id=source_id,
+ persist_mode=self.pmode,
name="")
cat.cat = self
return cat
@@ -389,8 +392,8 @@ class RemoteCatalogEntry(CatalogEntry):
"""An entry referring to a remote data definition"""
def __init__(self, url, auth, name=None, user_parameters=None,
container=None, description='', metadata=None,
- http_args=None, page_size=None, direct_access=False,
- getenv=True, getshell=True, **kwags):
+ http_args=None, page_size=None, persist_mode="default", direct_access=False,
+ getenv=True, getshell=True, **kwargs):
"""
Parameters
@@ -412,6 +415,8 @@ class RemoteCatalogEntry(CatalogEntry):
self.description = description
self._metadata = metadata or {}
self._page_size = page_size
+ # Persist mode describing a nested RemoteCatalog
+ self.catalog_pmode = persist_mode
self._user_parameters = [remake_instance(up)
if (isinstance(up, dict) and 'cls' in up)
else up
@@ -423,6 +428,8 @@ class RemoteCatalogEntry(CatalogEntry):
super(RemoteCatalogEntry, self).__init__(getenv=getenv,
getshell=getshell)
+
+ # Persist mode for the RemoteCatalogEntry
self._pmode = "never"
def describe(self):
@@ -456,11 +463,12 @@ class RemoteCatalogEntry(CatalogEntry):
page_size=self._page_size,
auth=self.auth,
getenv=self.getenv,
+ persist_mode=self.catalog_pmode,
getshell=self.getshell)
def open_remote(url, entry, container, user_parameters, description, http_args,
- page_size=None, auth=None, getenv=None, getshell=None):
+ page_size=None, persist_mode=None, auth=None, getenv=None, getshell=None):
"""Create either local direct data source or remote streamed source"""
from intake.container import container_map
import msgpack
@@ -499,13 +507,14 @@ def open_remote(url, entry, container, user_parameters, description, http_args,
response.update({'auth': auth,
'getenv': getenv,
'getshell': getshell,
- 'page_size': page_size
+ 'page_size': page_size,
+ 'persist_mode': persist_mode
# TODO ttl?
# TODO storage_options?
})
source = container_map[container](url, http_args, **response)
source.description = description
return source
-
else:
raise Exception('Server error: %d, %s' % (req.status_code, req.reason))
+
=====================================
intake/catalog/tests/test_local.py
=====================================
@@ -803,23 +803,6 @@ def test_cat_dictlike(catalog1):
assert list(catalog1.items()) == list(zip(catalog1.keys(), catalog1.values()))
- at pytest.fixture
-def inherit_params_cat():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = posixpath.join(tmp_dir, "intake")
- target_catalog = copy_test_file("catalog_inherit_params.yml", tmp_path)
- return open_catalog(target_catalog)
-
-
- at pytest.fixture
-def inherit_params_multiple_cats():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = posixpath.join(tmp_dir, "intake")
- copy_test_file("catalog_inherit_params.yml", tmp_path)
- copy_test_file("catalog_nested_sub.yml", tmp_path)
- return open_catalog(tmp_path + "/*.yml")
-
-
def test_inherit_params(inherit_params_cat):
assert inherit_params_cat.param._urlpath == "s3://test_bucket/file.parquet"
=====================================
intake/catalog/tests/test_parameters.py
=====================================
@@ -1,5 +1,6 @@
import os
import pytest
+import intake
from intake.catalog.local import LocalCatalogEntry, UserParameter
from intake.source.base import DataSource
@@ -163,6 +164,19 @@ def test_validate_par():
assert s.kwargs['arg1'] == 1 # a number, not str
+def test_mlist_parameter():
+ up = UserParameter("", "", "mlist", allowed=["a", "b"])
+ up.validate([])
+ up.validate(['b'])
+ up.validate(['b', 'a'])
+ with pytest.raises(ValueError):
+ up.validate(["c"])
+ with pytest.raises(ValueError):
+ up.validate(["a", "c"])
+ with pytest.raises(TypeError):
+ up.validate("hello")
+
+
def test_explicit_overrides():
e = LocalCatalogEntry('', '', driver, args={'arg1': "oi"})
s = e(arg1='hi')
@@ -204,3 +218,20 @@ def test_unknown():
parameters=[up])
s = e()
assert s.kwargs['arg1'] == ""
+
+
+def test_catalog_passthrough():
+ root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ cat = intake.open_catalog(os.path.join(root, "tests/catalog_inherit_params.yml"))
+ assert set(cat.subcat.user_parameters) == {"bucket", "inner"}
+ url = cat.subcat.ex2.urlpath
+ assert url == "test_bucket/test_name"
+ url = cat.subcat.ex2(bucket="hi", inner="ho").urlpath
+ assert url == "hi/ho"
+
+ # test clone
+ cat2 = cat(bucket="yet")
+ url = cat2.subcat(inner="another").ex2.urlpath
+ assert url == "yet/another"
+ url = cat2.subcat(inner="another").ex2(bucket="hi").urlpath
+ assert url == "hi/another"
=====================================
intake/catalog/tests/test_reload_integration.py
=====================================
@@ -45,9 +45,9 @@ sources:
description: example1 source plugin
driver: example1
args: {}
- ''')
+''')
- time.sleep(2)
+ time.sleep(1)
yield intake_server
os.remove(fullname)
@@ -59,22 +59,13 @@ def test_reload_updated_config(intake_server_with_config):
entries = list(catalog)
assert entries == ['use_example1']
- with open(os.path.join(TMP_DIR, YAML_FILENAME), 'w') as f:
+ with open(os.path.join(TMP_DIR, YAML_FILENAME), 'a') as f:
f.write('''
-plugins:
- source:
- - module: intake.catalog.tests.example1_source
- - module: intake.catalog.tests.example_plugin_dir.example2_source
-sources:
- use_example1:
- description: example1 source plugin
- driver: example1
- args: {}
use_example1_1:
description: example1 other
driver: example1
args: {}
- ''')
+''')
time.sleep(1.2)
=====================================
intake/catalog/tests/test_utils.py
=====================================
@@ -40,3 +40,18 @@ def test_coerce_datetime(test_input, expected):
def test_flatten():
assert list(utils.flatten([["hi"], ["oi"]])) == ['hi', 'oi']
+
+ at pytest.mark.parametrize(
+ "value,dtype,expected", [
+ (1, "int", 1),
+ ("1", "int", 1),
+ (1, "str", "1"),
+ ((), "list", []),
+ ((1, ), "list", [1]),
+ ((1, ), "list[str]", ["1"])
+ ],
+)
+def test_coerce(value, dtype, expected):
+ out = utils.coerce(dtype, value)
+ assert out == expected
+ assert type(out) == type(expected)
=====================================
intake/catalog/utils.py
=====================================
@@ -5,6 +5,8 @@
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------
+import ast
+from collections.abc import Iterable
import functools
import itertools
from jinja2 import Environment, meta, Undefined
@@ -299,6 +301,11 @@ def coerce(dtype, value):
type constructor is returned. Otherwise, the type constructor converts
and returns the value.
"""
+ if "[" in dtype:
+ dtype, inner = dtype.split("[")
+ inner = inner.rstrip("]")
+ else:
+ inner = None
if dtype is None:
return value
if type(value).__name__ == dtype:
@@ -306,11 +313,21 @@ def coerce(dtype, value):
if dtype == "mlist":
if isinstance(value, (tuple, set, dict)):
return list(value)
- if not isinstance(value, list):
- raise TypeError("Will not coerce value %s to list", value)
+ if isinstance(value, str):
+ try:
+ value = ast.literal_eval(value)
+ return list(value)
+ except ValueError as e:
+ raise ValueError("Failed to coerce string to list") from e
return value
op = COERCION_RULES[dtype]
- return op() if value is None else op(value)
+ out = op() if value is None else op(value)
+ if isinstance(out, dict) and inner is not None:
+ # TODO: recurse into coerce here, to allow list[list[str]] and such?
+ out = {k: COERCION_RULES[inner](v) for k, v in out.items()}
+ if isinstance(out, (tuple, list, set)) and inner is not None:
+ out = op(COERCION_RULES[inner](v) for v in out)
+ return out
class RemoteCatalogError(Exception):
=====================================
intake/cli/client/subcommands/drivers.py
=====================================
@@ -75,8 +75,13 @@ class Drivers(Subcommand):
fmt = '{name:<30}{cls.__module__}.{cls.__name__}'
drivers_by_name = autodiscover() # dict mapping name to driver
all_drivers = autodiscover_all() # listof (name, driver)
- direct = {k: v for k, v in intake.registry.items()
- if k not in all_drivers and k not in drivers_by_name}
+ direct = {}
+ for k in intake.registry:
+ if k not in all_drivers and k not in drivers_by_name:
+ try:
+ direct[k] = intake.registry[k]
+ except ImportError:
+ pass
print("Direct:", file=sys.stderr)
none = True
=====================================
intake/conftest.py
=====================================
@@ -9,15 +9,18 @@ import os
import subprocess
import time
+import posixpath
import pytest
import requests
+import tempfile
-from intake import config
+from intake import config, open_catalog
from intake.container import persist
from intake.util_tests import ex, PY2
from intake.utils import make_path_posix
from intake.source.base import DataSource, Schema
from intake import register_driver
+from intake.tests.test_utils import copy_test_file
here = os.path.dirname(__file__)
@@ -201,3 +204,29 @@ def env(temp_cache, tempdir):
env["INTAKE_CONF_DIR"] = intake.config.confdir
env['INTAKE_CACHE_DIR'] = intake.config.conf['cache_dir']
return env
+
+
+ at pytest.fixture
+def inherit_params_cat():
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_path = posixpath.join(tmp_dir, "intake")
+ target_catalog = copy_test_file("catalog_inherit_params.yml", tmp_path)
+ return open_catalog(target_catalog)
+
+
+ at pytest.fixture
+def inherit_params_multiple_cats():
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_path = posixpath.join(tmp_dir, "intake")
+ copy_test_file("catalog_inherit_params.yml", tmp_path)
+ copy_test_file("catalog_nested_sub.yml", tmp_path)
+ return open_catalog(tmp_path + "/*.yml")
+
+
+ at pytest.fixture
+def inherit_params_subcat():
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_path = posixpath.join(tmp_dir, "intake")
+ target_catalog = copy_test_file("catalog_inherit_params.yml", tmp_path)
+ copy_test_file("catalog_nested_sub.yml", tmp_path)
+ return open_catalog(target_catalog)
=====================================
intake/source/discovery.py
=====================================
@@ -141,7 +141,7 @@ def autodiscover(path=None, plugin_prefix='intake_', do_package_scan=False):
try:
drivers[entrypoint.name] = _load_entrypoint(entrypoint)
except ConfigurationError:
- logger.exception(
+ logger.debug(
"Error while loading entrypoint %s",
entrypoint.name)
continue
@@ -218,7 +218,7 @@ def autodiscover_all(path=None, plugin_prefix='intake_', do_package_scan=True):
try:
drivers.append((entrypoint.name, _load_entrypoint(entrypoint)))
except ConfigurationError:
- logger.exception(
+ logger.debug(
"Error while loading entrypoint %s",
entrypoint.name)
continue
@@ -245,10 +245,10 @@ def _load_entrypoint(entrypoint):
"""
try:
return entrypoint.load()
- except ModuleNotFoundError as err:
+ except ImportError as err:
raise ConfigurationError(
f"Failed to load {entrypoint.name} driver because module "
- f"{entrypoint.module_name} could not be found.") from err
+ f"{entrypoint.module_name} could not be imported.") from err
except AttributeError as err:
raise ConfigurationError(
f"Failed to load {entrypoint.name} driver because no object "
=====================================
intake/source/jsonfiles.py
=====================================
@@ -0,0 +1,168 @@
+import contextlib
+import json
+from itertools import islice
+
+from intake.source.base import DataSource
+
+
+class JSONFileSource(DataSource):
+ """
+ Read JSON files as a single dictionary or list
+
+ The files can be local or remote. Extra parameters for encoding, etc.,
+ go into ``storage_options``.
+ """
+
+ name = "json"
+ version = "0.0.1"
+ container = "python"
+
+ def __init__(
+ self,
+ urlpath: str,
+ text_mode: bool = True,
+ text_encoding: str = "utf8",
+ compression: str = None,
+ read: bool = True,
+ metadata: dict = None,
+ storage_options: dict = None,
+ ):
+ """
+ Parameters
+ ----------
+ urlpath : str
+ Target file. Can include protocol specified (e.g., "s3://").
+ text_mode : bool
+ Whether to open the file in text mode, recoding binary
+ characters on the fly
+ text_encoding : str
+ If text_mode is True, apply this encoding. UTF* is by far the most
+ common
+ compression : str or None
+ If given, decompress the file with the given codec on load. Can
+ be something like "zip", "gzip", "bz2", or to try to guess from the
+ filename, 'infer'
+ storage_options: dict
+ Options to pass to the file reader backend, including text-specific
+ encoding arguments, and parameters specific to the remote
+ file-system driver, if using.
+ """
+ from fsspec.utils import compressions
+
+ VALID_COMPRESSIONS = list(compressions.values()) + ["infer"]
+
+ self._urlpath = urlpath
+ self._storage_options = storage_options or {}
+ self._dataframe = None
+ self._file = None
+ self.compression = compression
+
+ if compression not in VALID_COMPRESSIONS:
+ raise ValueError(
+ f"Compression value {compression} must be one of {VALID_COMPRESSIONS}"
+ )
+ self.mode = "rt" if text_mode else "rb"
+ self.encoding = text_encoding
+ self._read = read
+
+ super(JSONFileSource, self).__init__(metadata=metadata)
+
+ def read(self):
+ import fsspec
+
+ urlpath = self._get_cache(self._urlpath)[0]
+ with fsspec.open(
+ urlpath,
+ mode=self.mode,
+ encoding=self.encoding,
+ compression=self.compression,
+ **self._storage_options,
+ ) as f:
+ return json.load(f)
+
+
+class JSONLinesFileSource(DataSource):
+ """
+ Read a JSONL (https://jsonlines.org/) file and return a list of objects,
+ each being valid json object (e.g. a dictionary or list)
+ """
+
+ name = "jsonl"
+ version = "0.0.1"
+ container = "python"
+
+ def __init__(
+ self,
+ urlpath: str,
+ text_mode: bool = True,
+ text_encoding: str = "utf8",
+ compression: str = None,
+ read: bool = True,
+ metadata: dict = None,
+ storage_options: dict = None,
+ ):
+ """
+ Parameters
+ ----------
+ urlpath : str
+ Target file. Can include protocol specified (e.g., "s3://").
+ text_mode : bool
+ Whether to open the file in text mode, recoding binary
+ characters on the fly
+ text_encoding : str
+ If text_mode is True, apply this encoding. UTF* is by far the most
+ common
+ compression : str or None
+ If given, decompress the file with the given codec on load. Can
+ be something like "zip", "gzip", "bz2", or to try to guess from the
+ filename, 'infer'.
+ storage_options: dict
+ Options to pass to the file reader backend, including text-specific
+ encoding arguments, and parameters specific to the remote
+ file-system driver, if using.
+ """
+ from fsspec.utils import compressions
+
+ VALID_COMPRESSIONS = list(compressions.values()) + ["infer"]
+
+ self._urlpath = urlpath
+ self._storage_options = storage_options or {}
+ self._dataframe = None
+ self._file = None
+ self.compression = compression
+ if compression not in VALID_COMPRESSIONS:
+ raise ValueError(
+ f"Compression value {compression} must be one of {VALID_COMPRESSIONS}"
+ )
+ self.mode = "rt" if text_mode else "rb"
+ self.encoding = text_encoding
+ self._read = read
+ super().__init__(metadata=metadata)
+
+ @contextlib.contextmanager
+ def _open(self):
+ """
+ Yields an fsspec.OpenFile object
+ """
+ import fsspec
+
+ urlpath = self._get_cache(self._urlpath)[0]
+ with fsspec.open(
+ urlpath,
+ mode=self.mode,
+ encoding=self.encoding,
+ compression=self.compression,
+ **self._storage_options,
+ ) as f:
+ yield f
+
+ def read(self):
+ with self._open() as f:
+ return list(map(json.loads, f))
+
+ def head(self, nrows: int = 100):
+ """
+ return the first `nrows` lines from the file
+ """
+ with self._open() as f:
+ return list(map(json.loads, islice(f, nrows)))
=====================================
intake/source/tests/test_discovery.py
=====================================
@@ -53,7 +53,7 @@ def test_discover_cli(extra_pythonpath, tmp_config_path):
out = subprocess.check_output(shlex.split(
"intake drivers list"
- ), stderr=subprocess.STDOUT)
+ ), stderr=subprocess.STDOUT, env=env)
assert b'foo' in out
assert out.index(b'Not enabled') > out.index(b'foo')
=====================================
intake/source/tests/test_json.py
=====================================
@@ -0,0 +1,70 @@
+# -----------------------------------------------------------------------------
+# Copyright (c) 2012 - 2021, Anaconda, Inc. and Intake contributors
+# All rights reserved.
+#
+# The full license is in the LICENSE file, distributed with this software.
+# -----------------------------------------------------------------------------
+
+import json
+import os
+from pathlib import Path
+from typing import Dict, Optional
+
+import pytest
+from fsspec import open_files
+from fsspec.utils import compressions
+
+from intake.source.jsonfiles import JSONFileSource, JSONLinesFileSource
+
+here = os.path.abspath(os.path.dirname(__file__))
+
+EXTENSIONS = {
+ compression: f".{extension}" for extension, compression in compressions.items()
+}
+
+
+ at pytest.fixture(params=[None, "gzip", "bz2"])
+def json_file(request, tmp_path) -> str:
+ data = {"hello": "world"}
+ file_path = str(tmp_path / "1.json")
+ file_path += EXTENSIONS.get(request.param, "")
+ with open_files([file_path], mode="wt", compression=request.param)[0] as f:
+ f.write(json.dumps(data))
+ return file_path
+
+
+ at pytest.fixture(params=[None, "gzip", "bz2"])
+def jsonl_file(request, tmp_path) -> str:
+ data = [{"hello": "world"}, [1, 2, 3]]
+ file_path = str(tmp_path / "1.jsonl")
+ file_path += EXTENSIONS.get(request.param, "")
+ with open_files([file_path], mode="wt", compression=request.param)[0] as f:
+ f.write("\n".join(json.dumps(row) for row in data))
+ return file_path
+
+
+def test_jsonfile(json_file: str):
+ j = JSONFileSource(json_file, text_mode=True, compression="infer")
+ out = j.read()
+ assert isinstance(out, dict)
+ assert out["hello"] == "world"
+
+
+def test_jsonlfile(jsonl_file: str):
+ j = JSONLinesFileSource(jsonl_file, compression="infer")
+ out = j.read()
+ assert isinstance(out, list)
+
+ assert isinstance(out[0], dict)
+ assert out[0]["hello"] == "world"
+
+ assert isinstance(out[1], list)
+ assert out[1] == [1, 2, 3]
+
+
+def test_jsonl_head(jsonl_file: str):
+ j = JSONLinesFileSource(jsonl_file, compression="infer")
+ out = j.head(1)
+ assert isinstance(out, list)
+ assert len(out) == 1
+ assert out[0]["hello"] == "world"
=====================================
intake/source/tests/test_tiled.py
=====================================
@@ -0,0 +1,36 @@
+import intake
+import pytest
+import shlex
+import subprocess
+import time
+pytest.importorskip("tiled")
+
+import httpx # required by tiled, so will be here
+
+
+ at pytest.fixture()
+def server():
+ cmd = shlex.split("tiled serve pyobject --public tiled.examples.generated:tree")
+ P = subprocess.Popen(cmd)
+ url = "http://localhost:8000"
+ timeout = 10
+ while True:
+ try:
+ r = httpx.get(url)
+ if r.status_code -- 200:
+ break
+ except:
+ pass
+ timeout -= 0.1
+ assert timeout > 0, "timeout waiting for Tiled server"
+ time.sleep(0.1)
+ yield url
+ P.terminate()
+ P.wait()
+
+
+def test_simple(server):
+ cat = intake.open_tiled_cat(server)
+ out = cat.tiny_image.read()
+ assert out.shape == (10, 10)
+ assert out.all()
=====================================
intake/source/tiled.py
=====================================
@@ -0,0 +1,136 @@
+from tiled.client import from_uri
+from tiled.client.node import Node
+from intake.catalog import Catalog
+from intake.source import DataSource
+
+
+class TiledCatalog(Catalog):
+ """View Tiled server as a catalog
+
+ See the documentation for setting up such a server at
+ https://blueskyproject.io/tiled/
+
+ A tiled server may contain sources of dataframe, array or xarray type.
+ This driver exposes the full tree as exposed by the server, but you
+ can also specify the sub-path of that tree.
+ """
+
+ name = "tiled_cat"
+
+ def __init__(self, server, path=None):
+ """
+
+ Parameters
+ ----------
+ server: str or tiled.client.node.Node
+ Location of tiles server. Usually of the form "http[s]://address:port/"
+ May include a path. If the protocol is "tiled", we assume HTTP
+ connection. Alternatively, can be a Node instance, already connected
+ to a server.
+ path: str (optional)
+ If given, restrict the catalog to this part of the server's catalog
+ tree. Equivalent to extending the server URL.
+ """
+ self.path = path
+ if isinstance(server, str):
+ if server.startswith("tiled"):
+ uri = server.replace("tiled", "http", 1)
+ else:
+ uri = server
+ client = from_uri(uri, "dask")
+ else:
+ client = server
+ uri = server.uri
+ self.uri = uri
+ if path is not None:
+ client = client[path]
+ super().__init__(entries=client, name="tiled:" + uri.split(":", 1)[1])
+
+ def search(self, query, type="text"):
+ """Full text search
+
+ Queries other than full text will be added later
+ """
+ if type == "text":
+ from tiled.queries import FullText
+ q = FullText(query)
+ else:
+ raise NotImplementedError
+ return TiledCatalog.from_dict(self._entries.search(q), uri=self.uri, path=self.path)
+
+ def __getitem__(self, item):
+ node = self._entries[item]
+ if isinstance(node, Node):
+ return TiledCatalog(node)
+ else:
+ return TiledSource(uri=self.uri, path=item, instance=node)
+
+
+types = {
+ "DaskArrayClient": "ndarray",
+ "DaskDataArrayClient": "xarray",
+ "DaskDatasetClient": "xarray",
+ "DaskVariableClient": "xarray",
+ "DaskDataFrameClient": "dataframe"
+}
+
+
+class TiledSource(DataSource):
+ """A source on a Tiled server
+
+ The container type of this source is determined at runtime.
+ The attribute ``.instance`` gives access to the underlying Tiled
+ API, but most users will only call ``.to_dask()``.
+ """
+
+ name = "tiled"
+
+ def __init__(self, uri="", path="", instance=None, metadata=None):
+ """
+
+ Parameters
+ ----------
+ uri: str (optional)
+ Location of the server. If ``instance`` is given, this is
+ only used for the repr
+ pathL str (optional)
+ Path of the data source within the server tree. If ``instance``
+ is given, this is only used for the repr
+ instance: tiled.client.node.None (optional)
+ The tiled object pointing to the data source; normally created
+ by a ``TiledCatalog``
+ metadata: dict
+ Extra metadata for this source; metadata will also be provided
+ by the server.
+ """
+ if instance is None:
+ instance = from_uri(uri, "dask")[path].read()
+ self.instance = instance
+ md = dict(instance.metadata)
+ if metadata:
+ md.update(metadata)
+ super().__init__(metadata=md)
+ self.name = path
+ self.container = types[type(self.instance).__name__]
+
+ def discover(self):
+ x = self.to_dask()
+ dt = getattr(x, "dtype", None) or getattr(x, "dtypes", None)
+ parts = getattr(x, "npartitions", None) or x.data.npartitions
+ return dict(dtype=dt,
+ shape=getattr(self.instance.structure().macro, "shape", x.shape),
+ npartitions=parts,
+ metadata=self.metadata)
+
+ def to_dask(self):
+ # cache this?
+ return self.instance.read()
+
+ def read(self):
+ return self.instance.read().compute()
+
+ def _yaml(self):
+ y = super()._yaml()
+ v = list(y['sources'].values())[0]
+ v['args'].pop('instance')
+ return y
=====================================
intake/tests/catalog_inherit_params.yml
=====================================
@@ -32,3 +32,12 @@ sources:
default: local_filename.parquet
args:
urlpath: s3://{{bucket}}/{{filename}}
+ subcat:
+ driver: yaml_file_cat
+ args:
+ path: "{{CATALOG_DIR}}/catalog_nested_sub.yml"
+ user_parameters:
+ inner:
+ type: str
+ description: description
+ default: test_name
=====================================
intake/tests/catalog_nested_sub.yml
=====================================
@@ -4,3 +4,8 @@ sources:
driver: csv
args:
urlpath: ""
+ ex2:
+ description: this is a sub-resource
+ driver: csv
+ args:
+ urlpath: "{{bucket}}/{{inner}}"
=====================================
requirements.txt
=====================================
@@ -2,5 +2,5 @@ appdirs
dask
entrypoints
pyyaml
-fsspec >=0.7.4
+fsspec >=2021.7.0
jinja2
=====================================
scripts/ci/environment-py37.yml
=====================================
@@ -7,7 +7,7 @@ dependencies:
- aiohttp
- flask
- appdirs
- - dask=2021.06.0
+ - dask
- jinja2
- numpy
- pyyaml
=====================================
scripts/ci/environment-py39.yml
=====================================
@@ -36,5 +36,8 @@ dependencies:
- xarray
- zarr
- moto
+ - httpx
+ - typer
- pip:
- rangehttpserver
+ - tiled[all]
=====================================
setup.py
=====================================
@@ -6,8 +6,10 @@
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------
-from setuptools import setup, find_packages
import sys
+
+from setuptools import find_packages, setup
+
import versioneer
requires = [line.strip() for line in open('requirements.txt').readlines()
@@ -49,12 +51,16 @@ setup(
'yaml_files_cat = intake.catalog.local:YAMLFilesCatalog',
'csv = intake.source.csv:CSVSource',
'textfiles = intake.source.textfiles:TextFilesSource',
+ 'json = intake.source.jsonfiles:JSONFileSource',
+ 'jsonl = intake.source.jsonfiles:JSONLinesFileSource',
'catalog = intake.catalog.base:Catalog',
'intake_remote = intake.catalog.remote:RemoteCatalog',
'numpy = intake.source.npy:NPySource',
'ndzarr = intake.source.zarr:ZarrArraySource',
'zarr_cat = intake.catalog.zarr:ZarrGroupCatalog',
'alias = intake.source.derived:AliasSource',
+ 'tiled_cat = intake.source.tiled:TiledCatalog',
+ 'tiled = intake.source.tiled:TiledSource'
]
},
classifiers=[
=====================================
versioneer.py
=====================================
@@ -339,9 +339,8 @@ def get_config_from_root(root):
# configparser.NoOptionError (if it lacks "VCS="). See the docstring at
# the top of versioneer.py for instructions on writing your setup.cfg .
setup_cfg = os.path.join(root, "setup.cfg")
- parser = configparser.SafeConfigParser()
- with open(setup_cfg, "r") as f:
- parser.readfp(f)
+ parser = configparser.ConfigParser()
+ parser.read(setup_cfg)
VCS = parser.get("versioneer", "VCS") # mandatory
def get(parser, name):
View it on GitLab: https://salsa.debian.org/med-team/intake/-/commit/1a4217212c9ae9883f7f88c459fe74fa1658b2f2
--
View it on GitLab: https://salsa.debian.org/med-team/intake/-/commit/1a4217212c9ae9883f7f88c459fe74fa1658b2f2
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20220116/c55fbd23/attachment-0001.htm>
More information about the debian-med-commit
mailing list