[Git][debian-gis-team/python-s3fs][upstream] New upstream version 2026.3.0
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Sat Apr 4 15:08:14 BST 2026
Antonio Valentino pushed to branch upstream at Debian GIS Project / python-s3fs
Commits:
5645f416 by Antonio Valentino at 2026-04-04T14:02:52+00:00
New upstream version 2026.3.0
- - - - -
5 changed files:
- docs/source/changelog.rst
- requirements.txt
- s3fs/_version.py
- s3fs/core.py
- s3fs/tests/test_s3fs.py
Changes:
=====================================
docs/source/changelog.rst
=====================================
@@ -1,6 +1,14 @@
Changelog
=========
+2026.3.0
+--------
+
+- allow _find with both prefix/withdirs, allows prefix in glob (#1014)
+- deduplicate file listing with bisect (#1010)
+- check for file expiry with ETag if server doesn't support If-Match (#1008)
+- concurrent downloads in get/cat_file (#1007)
+
2026.2.0
--------
=====================================
requirements.txt
=====================================
@@ -1,3 +1,3 @@
aiobotocore>=2.19.0,<4.0.0
-fsspec==2026.2.0
-aiohttp!=4.0.0a0, !=4.0.0a1
+fsspec==2026.3.0
+aiohttp!=4.0.0a0, !=4.0.0a1, >=3.9.0
=====================================
s3fs/_version.py
=====================================
@@ -25,9 +25,9 @@ def get_keywords() -> Dict[str, str]:
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (HEAD -> main, tag: 2026.2.0)"
- git_full = "1181d335955418f081a1d0b94c3d8350cea0751f"
- git_date = "2026-02-05 16:57:01 -0500"
+ git_refnames = " (HEAD -> main, tag: 2026.3.0)"
+ git_full = "731e1250bcd4f682e1ccce03b01641910e7646fc"
+ git_date = "2026-03-27 15:27:37 -0400"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
=====================================
s3fs/core.py
=====================================
@@ -1,4 +1,5 @@
import asyncio
+import bisect
import errno
import io
import logging
@@ -323,15 +324,21 @@ class S3FileSystem(AsyncFileSystem):
For example: aiobotocore.session.AioSession(profile='test_user')
max_concurrency : int (10)
The maximum number of concurrent transfers to use per file for multipart
- upload (``put()``) operations. Defaults to 10. When used in
- conjunction with ``S3FileSystem.put(batch_size=...)`` the maximum number of
- simultaneous connections is ``max_concurrency * batch_size``. We may extend
- this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this
- value will result in higher memory usage during multipart upload operations (by
- ``max_concurrency * chunksize`` bytes per file).
+ upload (``put()``, ``pipe()``), download (``get()``) and read
+ (``cat()``) operations. Defaults to 10. When used in conjunction
+ with ``S3FileSystem.put(batch_size=...)`` the maximum number of
+ simultaneous connections is ``max_concurrency * batch_size``.
+ Increasing this value will result in higher
+ memory usage during multipart transfer operations (by
+ ``max_concurrency * chunksize`` bytes per file), and may result in timeouts on slow
+ networks. Set to 1 if you are having connection issues.
fixed_upload_size : bool (False)
Use same chunk size for all parts in multipart upload (last part can be smaller).
Cloudflare R2 storage requires fixed_upload_size=True for multipart uploads.
+ local_expiry_check : bool (False)
+ Perform expiry checks when using range reads locally instead of having the
+ server perform them. VAST S3 requires local_expiry_check=True because
+ it doesn't support the `If-Match` http header used for expiry check on the server.
The following parameters are passed on to fsspec:
@@ -381,6 +388,7 @@ class S3FileSystem(AsyncFileSystem):
loop=None,
max_concurrency=10,
fixed_upload_size: bool = False,
+ local_expiry_check: bool = False,
**kwargs,
):
if key and username:
@@ -419,6 +427,7 @@ class S3FileSystem(AsyncFileSystem):
self._s3 = None
self.session = session
self.fixed_upload_size = fixed_upload_size
+ self.local_expiry_check = local_expiry_check
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
self.max_concurrency = max_concurrency
@@ -937,25 +946,53 @@ class S3FileSystem(AsyncFileSystem):
when used by glob, but users usually only want files.
prefix: str
Only return files that match ``^{path}/{prefix}`` (if there is an
- exact match ``filename == {path}/{prefix}``, it also will be included)
+ exact match ``filename == {path}/{prefix}``, it also will be included).
+ Can be combined with ``withdirs`` and ``maxdepth``.
"""
path = self._strip_protocol(path)
bucket, key, _ = self.split_path(path)
if not bucket:
raise ValueError("Cannot traverse all of S3")
- if (withdirs or maxdepth) and prefix:
- # TODO: perhaps propagate these to a glob(f"path/{prefix}*") call
- raise ValueError(
- "Can not specify 'prefix' option alongside 'withdirs'/'maxdepth' options."
- )
if maxdepth:
- return await super()._find(
- bucket + "/" + key,
- maxdepth=maxdepth,
- withdirs=withdirs,
- detail=detail,
- **kwargs,
+ if not prefix:
+ return await super()._find(
+ bucket + "/" + key,
+ maxdepth=maxdepth,
+ withdirs=withdirs,
+ detail=detail,
+ **kwargs,
+ )
+ # maxdepth + prefix: one delimiter-based listing for the first level
+ # (server-side prefix filter), then recurse into matching subdirs
+ # normally — avoids fetching all nested objects up front.
+ first_level = await self._lsdir(
+ path, delimiter="/", prefix=prefix, **kwargs
)
+ files = [o for o in first_level if o["type"] != "directory"]
+ dirs = [o for o in first_level if o["type"] == "directory"]
+ out = list(files)
+ out_dirs = list(dirs)
+ if maxdepth > 1:
+ for d in dirs:
+ sub = await self._find(
+ d["name"],
+ maxdepth=maxdepth - 1,
+ withdirs=withdirs,
+ detail=True,
+ **kwargs,
+ )
+ for name, info in sub.items():
+ if name == d["name"]:
+ continue # root dir already in out_dirs
+ if info["type"] == "directory":
+ out_dirs.append(info)
+ else:
+ out.append(info)
+ if withdirs:
+ out = sorted(out + out_dirs, key=lambda x: x["name"])
+ if detail:
+ return {o["name"]: o for o in out}
+ return [o["name"] for o in out]
# TODO: implement find from dircache, if all listings are present
# if refresh is False:
# out = incomplete_tree_dirs(self.dircache, path)
@@ -1008,11 +1045,14 @@ class S3FileSystem(AsyncFileSystem):
# Explicitly add directories to their parents in the dircache
for d in dirs:
par = self._parent(d["name"])
- # extra condition here (in any()) to deal with directory-marking files
- if par in thisdircache and not any(
- _["name"] == d["name"] for _ in thisdircache[par]
- ):
- thisdircache[par].append(d)
+ if par in thisdircache:
+ # listing is sorted
+ ind = bisect.bisect_right(
+ thisdircache[par], d["name"], key=lambda _: _["name"]
+ )
+ if thisdircache[par][ind - 1]["name"] != d["name"]:
+ # name is not already present
+ thisdircache[par].insert(ind, d)
if not prefix:
for k, v in thisdircache.items():
@@ -1233,7 +1273,15 @@ class S3FileSystem(AsyncFileSystem):
touch = sync_wrapper(_touch)
- async def _cat_file(self, path, version_id=None, start=None, end=None):
+ async def _cat_file(
+ self,
+ path,
+ version_id=None,
+ start=None,
+ end=None,
+ chunksize=None,
+ max_concurrency=None,
+ ):
bucket, key, vers = self.split_path(path)
if start is not None or end is not None:
head = {"Range": await self._process_limits(path, start, end)}
@@ -1254,8 +1302,75 @@ class S3FileSystem(AsyncFileSystem):
finally:
resp["Body"].close()
+ if (
+ start is None
+ and end is None
+ and (max_concurrency or self.max_concurrency) > 1
+ ):
+ chunksize = chunksize or self.default_block_size
+ resp = await self._call_s3(
+ "get_object",
+ Bucket=bucket,
+ Key=key,
+ **version_id_kw(version_id or vers),
+ **self.req_kw,
+ )
+ content_length = resp.get("ContentLength", None)
+ resp["Body"].close()
+
+ if content_length and content_length > chunksize:
+ return await self._cat_file_concurrent(
+ bucket,
+ key,
+ content_length,
+ chunksize,
+ max_concurrency=max_concurrency,
+ version_id=version_id or vers,
+ )
+
return await _error_wrapper(_call_and_read, retries=self.retries)
+ async def _cat_file_concurrent(
+ self,
+ bucket,
+ key,
+ content_length,
+ chunksize,
+ max_concurrency=None,
+ version_id=None,
+ ):
+ max_concurrency = max_concurrency or self.max_concurrency
+
+ async def _read_chunk(start, end):
+ kw = self.req_kw.copy()
+ kw["Range"] = f"bytes={start}-{end}"
+ resp = await self._call_s3(
+ "get_object",
+ Bucket=bucket,
+ Key=key,
+ **version_id_kw(version_id),
+ **kw,
+ )
+ data = await resp["Body"].read()
+ resp["Body"].close()
+ return start, data
+
+ ranges = list(_get_brange(content_length, chunksize))
+ inds = list(range(0, len(ranges), max_concurrency)) + [len(ranges)]
+
+ buf = bytearray(content_length)
+ for batch_start, batch_stop in zip(inds[:-1], inds[1:]):
+ results = await asyncio.gather(
+ *[
+ _read_chunk(start, end)
+ for start, end in ranges[batch_start:batch_stop]
+ ]
+ )
+ for offset, data in results:
+ buf[offset : offset + len(data)] = data
+
+ return bytes(buf)
+
async def _pipe_file(
self,
path,
@@ -1444,8 +1559,60 @@ class S3FileSystem(AsyncFileSystem):
)
return out
+ async def _download_file_part_concurrent(
+ self,
+ bucket,
+ key,
+ lpath,
+ content_length,
+ chunksize,
+ callback=_DEFAULT_CALLBACK,
+ max_concurrency=None,
+ version_id=None,
+ ):
+ max_concurrency = max_concurrency or self.max_concurrency
+
+ async def _download_chunk(start, end):
+ kw = self.req_kw.copy()
+ kw["Range"] = f"bytes={start}-{end}"
+ resp = await self._call_s3(
+ "get_object",
+ Bucket=bucket,
+ Key=key,
+ **version_id_kw(version_id),
+ **kw,
+ )
+ data = await resp["Body"].read()
+ resp["Body"].close()
+ return start, data
+
+ ranges = list(_get_brange(content_length, chunksize))
+ inds = list(range(0, len(ranges), max_concurrency)) + [len(ranges)]
+
+ with open(lpath, "wb") as f0:
+ f0.truncate(content_length)
+
+ for batch_start, batch_stop in zip(inds[:-1], inds[1:]):
+ results = await asyncio.gather(
+ *[
+ _download_chunk(start, end)
+ for start, end in ranges[batch_start:batch_stop]
+ ]
+ )
+ for offset, data in results:
+ f0.seek(offset)
+ f0.write(data)
+ callback.relative_update(len(data))
+
async def _get_file(
- self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None, **kwargs
+ self,
+ rpath,
+ lpath,
+ callback=_DEFAULT_CALLBACK,
+ version_id=None,
+ chunksize=None,
+ max_concurrency=None,
+ **kwargs,
):
if os.path.isdir(lpath):
return
@@ -1467,6 +1634,26 @@ class S3FileSystem(AsyncFileSystem):
body, content_length = await _open_file(range=0)
callback.set_size(content_length)
+ chunksize = chunksize or self.default_block_size
+
+ if (
+ content_length
+ and content_length > chunksize
+ and (max_concurrency or self.max_concurrency) > 1
+ ):
+ body.close()
+ return await self._download_file_part_concurrent(
+ bucket,
+ key,
+ lpath,
+ content_length,
+ chunksize,
+ callback=callback,
+ max_concurrency=max_concurrency,
+ version_id=version_id or vers,
+ )
+
+ # Sequential download for small files or concurrency=1
failed_reads = 0
bytes_read = 0
@@ -2414,7 +2601,12 @@ class S3File(AbstractBufferedFile):
# Reflect head
self.s3_additional_kwargs.update(head)
- if "r" in mode and size is None and "ETag" in self.details:
+ if (
+ "r" in mode
+ and size is None
+ and "ETag" in self.details
+ and not s3.local_expiry_check
+ ):
self.req_kw["IfMatch"] = self.details["ETag"]
def _call_s3(self, method, *kwarglist, **kwargs):
@@ -2495,6 +2687,7 @@ class S3File(AbstractBufferedFile):
start,
end,
req_kw=self.req_kw,
+ details=self.details,
)
except OSError as ex:
@@ -2643,9 +2836,11 @@ class S3AsyncStreamedFile(AbstractAsyncStreamedFile):
return out
-def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
+def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None, details=None):
if req_kw is None:
req_kw = {}
+ if details is None:
+ details = {}
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
@@ -2656,10 +2851,17 @@ def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)
- return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)
+ return sync(
+ fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw, details
+ )
-async def _inner_fetch(fs, bucket, key, version_id, start, end, req_kw=None):
+async def _inner_fetch(
+ fs, bucket, key, version_id, start, end, req_kw=None, details=None
+):
+ if details is None:
+ details = {}
+
async def _call_and_read():
resp = await fs._call_s3(
"get_object",
@@ -2669,6 +2871,12 @@ async def _inner_fetch(fs, bucket, key, version_id, start, end, req_kw=None):
**version_id_kw(version_id),
**req_kw,
)
+ if (
+ fs.local_expiry_check
+ and "ETag" in details
+ and details["ETag"] != resp.get("ETag")
+ ):
+ raise FileExpired(filename=details["name"], e_tag=details.get("ETag"))
try:
return await resp["Body"].read()
finally:
=====================================
s3fs/tests/test_s3fs.py
=====================================
@@ -20,7 +20,7 @@ from dateutil.tz import tzutc
import botocore
import s3fs.core
from s3fs.core import MAX_UPLOAD_PARTS, S3FileSystem, calculate_chunksize
-from s3fs.utils import ignoring, SSEParams
+from s3fs.utils import ignoring, SSEParams, FileExpired
from botocore.exceptions import NoCredentialsError
from fsspec.asyn import sync
from fsspec.callbacks import Callback
@@ -148,14 +148,6 @@ def s3(s3_base):
yield s3
- at contextmanager
-def expect_errno(expected_errno):
- """Expect an OSError and validate its errno code."""
- with pytest.raises(OSError) as error:
- yield
- assert error.value.errno == expected_errno, "OSError has wrong error code."
-
-
def test_simple(s3):
data = b"a" * (10 * 2**20)
@@ -1009,6 +1001,43 @@ def test_get_file_with_kwargs(s3, tmpdir):
)
+ at pytest.mark.parametrize("size", [2**10, 10 * 2**20])
+def test_get_file_parallel_callback(s3, tmpdir, size):
+ data = os.urandom(size)
+ s3.pipe(test_bucket_name + "/parallel_test", data)
+
+ test_file = str(tmpdir.join("parallel_test"))
+ cb = Callback()
+ s3.get_file(
+ test_bucket_name + "/parallel_test",
+ test_file,
+ callback=cb,
+ max_concurrency=4,
+ chunksize=5 * 2**20,
+ )
+ with open(test_file, "rb") as f:
+ assert f.read() == data
+ assert cb.size == len(data)
+ assert cb.value == cb.size
+
+
+ at pytest.mark.parametrize("factor", [1, 5, 6])
+def test_get_file_parallel_integrity(s3, tmpdir, factor):
+ chunksize = 5 * 2**20
+ data = os.urandom(chunksize * factor)
+ s3.pipe(test_bucket_name + "/parallel_integrity", data)
+
+ test_file = str(tmpdir.join("parallel_integrity"))
+ s3.get_file(
+ test_bucket_name + "/parallel_integrity",
+ test_file,
+ max_concurrency=5,
+ chunksize=chunksize,
+ )
+ with open(test_file, "rb") as f:
+ assert f.read() == data
+
+
@pytest.mark.parametrize("size", [2**10, 10 * 2**20])
def test_put_file_with_callback(s3, tmpdir, size):
test_file = str(tmpdir.join("test.json"))
@@ -1046,6 +1075,20 @@ def test_pipe_cat_big(s3, size):
assert s3.cat(test_bucket_name + "/bigfile") == data
+ at pytest.mark.parametrize("factor", [1, 5, 6])
+def test_cat_file_parallel(s3, factor):
+ chunksize = 5 * 2**20
+ data = os.urandom(chunksize * factor)
+ s3.pipe(test_bucket_name + "/cat_parallel", data)
+
+ result = s3.cat_file(
+ test_bucket_name + "/cat_parallel",
+ max_concurrency=5,
+ chunksize=chunksize,
+ )
+ assert result == data
+
+
def test_errors(s3):
with pytest.raises(FileNotFoundError):
s3.open(test_bucket_name + "/tmp/test/shfoshf", "rb")
@@ -1106,6 +1149,19 @@ def test_errors_cause_preservings(monkeypatch, s3):
assert exc.value.__cause__ is None
+def test_local_expiry_check(s3):
+ s3 = S3FileSystem(
+ local_expiry_check=True,
+ anon=False,
+ client_kwargs={"endpoint_url": endpoint_uri},
+ )
+ path = test_bucket_name + "/test/accounts.1.json"
+
+ with s3.open(path, "r") as f:
+ f.read(10)
+ f.read(10)
+
+
def test_read_small(s3):
fn = test_bucket_name + "/2014-01-01.csv"
with s3.open(fn, "rb", block_size=10, cache_type="bytes") as f:
@@ -2381,7 +2437,13 @@ def test_get_file_info_with_selector(s3):
condition=version.parse(moto.__version__) <= version.parse("1.3.16"),
reason="Moto 1.3.16 is not supporting pre-conditions.",
)
-def test_raise_exception_when_file_has_changed_during_reading(s3):
+ at pytest.mark.parametrize("local_check", [False, True])
+def test_raise_exception_when_file_has_changed_during_reading(s3, local_check):
+ s3 = S3FileSystem(
+ local_expiry_check=local_check,
+ anon=False,
+ client_kwargs={"endpoint_url": endpoint_uri},
+ )
test_file_name = "file1"
test_file = "s3://" + test_bucket_name + "/" + test_file_name
content1 = b"123"
@@ -2402,7 +2464,7 @@ def test_raise_exception_when_file_has_changed_during_reading(s3):
with s3.open(test_file, "rb") as f:
create_file(content2)
- with expect_errno(errno.EBUSY):
+ with pytest.raises(FileExpired):
f.read()
@@ -2563,6 +2625,46 @@ def test_find_with_prefix(s3):
)
+def test_find_with_prefix_and_withdirs(s3):
+ # Issue #1013: prefix must be combinable with withdirs (used by _glob internally)
+ for cursor in range(10):
+ s3.touch(test_bucket_name + f"/wdpfx/sub/file_{cursor}")
+
+ # withdirs=True + prefix should work and include synthesised directory entries
+ result = s3.find(test_bucket_name, prefix="wdpfx", withdirs=True)
+ assert test_bucket_name + "/wdpfx/sub" in result
+ assert all(
+ r.startswith(test_bucket_name + "/wdpfx") for r in result
+ ), "prefix filter must be respected"
+ assert len([r for r in result if "file_" in r]) == 10
+
+ # prefix alone (withdirs=False default) must still work
+ files_only = s3.find(test_bucket_name + "/wdpfx/sub/", prefix="file_")
+ assert len(files_only) == 10
+
+
+def test_find_with_prefix_and_maxdepth(s3):
+ # Issue #1013: prefix must be combinable with maxdepth
+ for cursor in range(5):
+ s3.touch(test_bucket_name + f"/mxpfx/sub/file_{cursor}")
+ s3.touch(test_bucket_name + "/mxpfx_top")
+
+ # maxdepth=1 from test_bucket_name: only direct children (depth 1) are returned
+ # test_bucket_name/mxpfx_top is at depth 1, test_bucket_name/mxpfx/sub/file_* are at depth 3
+ result = s3.find(test_bucket_name, prefix="mxpfx", maxdepth=1)
+ assert test_bucket_name + "/mxpfx_top" in result
+ assert not any("file_" in r for r in result), "depth-2+ files must be excluded"
+
+ # maxdepth=2: picks up test_bucket_name/mxpfx/sub (depth 2 dir) but not files inside
+ result2 = s3.find(test_bucket_name, prefix="mxpfx", maxdepth=2, withdirs=True)
+ assert test_bucket_name + "/mxpfx/sub" in result2
+ assert not any("file_" in r for r in result2), "depth-3 files must be excluded"
+
+ # maxdepth=3: all files now reachable
+ result3 = s3.find(test_bucket_name, prefix="mxpfx", maxdepth=3)
+ assert len([r for r in result3 if "file_" in r]) == 5
+
+
def test_list_after_find(s3):
before = s3.ls("s3://test")
s3.invalidate_cache("s3://test/2014-01-01.csv")
View it on GitLab: https://salsa.debian.org/debian-gis-team/python-s3fs/-/commit/5645f4164a33f4b84b7b868730f7bdbb77177322
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/python-s3fs/-/commit/5645f4164a33f4b84b7b868730f7bdbb77177322
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20260404/024b8c01/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list