[Git][debian-gis-team/pyshp][upstream] 3 commits: New upstream version 3.0.6
Bas Couwenberg (@sebastic)
gitlab at salsa.debian.org
Sat May 23 18:57:21 BST 2026
Bas Couwenberg pushed to branch upstream at Debian GIS Project / pyshp
Commits:
0a63c33a by Bas Couwenberg at 2026-05-23T19:39:49+02:00
New upstream version 3.0.6
- - - - -
4ee78d08 by Bas Couwenberg at 2026-05-23T19:40:00+02:00
New upstream version 3.0.7
- - - - -
43f51f32 by Bas Couwenberg at 2026-05-23T19:40:14+02:00
New upstream version 3.0.8
- - - - -
7 changed files:
- README.md
- changelog.txt
- pyproject.toml
- + shapefiles/test/ATTRIBUTION
- + shapefiles/test/REL.zip
- src/shapefile.py
- test_shapefile.py
Changes:
=====================================
README.md
=====================================
@@ -7,9 +7,9 @@ The Python Shapefile Library (PyShp) reads and writes ESRI Shapefiles in pure Py

- **Author**: [Joel Lawhead](https://github.com/GeospatialPython)
-- **Maintainers**: [Karim Bahgat](https://github.com/karimbahgat)
-- **Version**: 3.0.3.dev0
-- **Date**: 10th October, 2025
+- **Maintainers**: [James Parrott](https://github.com/JamesParrott) & [Karim Bahgat](https://github.com/karimbahgat)
+- **Version**: 3.0.8
+- **Date**: 20th May 2026
- **License**: [MIT](https://github.com/GeospatialPython/pyshp/blob/master/LICENSE.TXT)
## Contents
@@ -93,6 +93,29 @@ part of your geospatial project.
# Version Changes
+## 3.0.8
+
+### Testability / separation of concerns.
+ - Separate dbf only writing methods into a new dbfWriter class (an instance of which is owned by the regular Shapefile Writer class).
+
+## 3.0.7
+
+### Testability / separation of concerns.
+ - Separate dbf only reading methods into a new dbfReader class (an instance of which is owned by the regular Shapefile Reader class).
+
+
+## 3.0.6
+
+### URL Downloading
+ - Unify tempfile creation and shapefile download logic.
+ - Check "Content-Type" header and sniff initial bytes
+ of response in order to possibly reject html responses, before parsing as a shapefile, to give a more useful error to users.
+ - Special case shapefiles hosted in Github repos to suggest appending the query string `?raw=true`.
+
+### Testing:
+ - Add shapefile from Open Natual Hazard Modelling ([Paula Spannring](https://github.com/PaulaSp3)
+ and [Felix Oesterle, Austrian Research Centre for Forests](https://orcid.org/0000-0002-7772-6884)).
+
## 3.0.5
### Project structure:
@@ -1594,6 +1617,7 @@ davidh-ssec
Edward Kawas
Evan Heidtmann
ezcitron
+Felix Oesterle
fiveham
geospatialpython
Hannes
@@ -1616,6 +1640,7 @@ Mike Toews
Miroslav Šedivý
Nilo
pakoun
+Paula Spannring
Paulo Ernesto
Raynor Vliegendhart
Razzi Abuissa
=====================================
changelog.txt
=====================================
@@ -1,3 +1,31 @@
+VERSION 3.0.8
+
+2026-05-20
+ Testability / separation of concerns:
+ * Separate dbf only writing methods into a new dbfWriter class (an instance of which is owned by the regular Shapefile Writer class).
+
+VERSION 3.0.7
+
+2026-05-20
+ Testability / separation of concerns:
+ * Separate dbf only reading methods into a new dbfReader class (an instance of which is owned by the regular Shapefile Reader class).
+
+
+VERSION 3.0.6
+
+2026-05-19
+ URL Downloading:
+ * Unify tempfile creation and shapefile download logic.
+ * Check "Content-Type" header and sniff initial bytes
+ of response in order to possibly reject html responses, before parsing as a shapefile, to give a more useful error to users.
+ * Special case shapefiles hosted in Github repos to suggest appending the query string `?raw=true`.
+
+
+2026-05-18
+ Testing:
+ * Add shapefile from Open Natual Hazard Modelling ([Paula Spannring](https://github.com/PaulaSp3)
+ and [Felix Oesterle, Austrian Research Centre for Forests](https://orcid.org/0000-0002-7772-6884)).
+
VERSION 3.0.5
2026-05-18
=====================================
pyproject.toml
=====================================
@@ -8,7 +8,8 @@ authors = [
{name = "Joel Lawhead", email = "jlawhead at geospatialpython.com"},
]
maintainers = [
- {name = "Karim Bahgat", email = "karim.bahgat.norway at gmail.com"}
+ {name = "James Parrott", email = "james at jamesparrott.dev"},
+ {name = "Karim Bahgat", email = "karim.bahgat.norway at gmail.com"},
]
readme = "README.md"
keywords = ["gis", "geospatial", "geographic", "shapefile", "shapefiles"]
=====================================
shapefiles/test/ATTRIBUTION
=====================================
Binary files /dev/null and b/shapefiles/test/ATTRIBUTION differ
=====================================
shapefiles/test/REL.zip
=====================================
Binary files /dev/null and b/shapefiles/test/REL.zip differ
=====================================
src/shapefile.py
=====================================
@@ -2,24 +2,27 @@
shapefile.py
Provides read and write support for ESRI Shapefiles.
authors: jlawhead<at>geospatialpython.com
-maintainer: karim.bahgat.norway<at>gmail.com
+maintainers: james<at>jamesparrott.dev and karim.bahgat.norway<at>gmail.com
Compatible with Python versions >=3.9
"""
from __future__ import annotations
-__version__ = "3.0.5"
+__version__ = "3.0.8"
import array
import doctest
+import functools
import io
import logging
import os
import sys
import tempfile
import time
+import warnings
import zipfile
from collections.abc import Container, Iterable, Iterator, Reversible, Sequence
+from contextlib import AbstractContextManager, ExitStack
from datetime import date
from os import PathLike
from struct import Struct, calcsize, error, pack, unpack
@@ -31,7 +34,6 @@ from typing import (
Generic,
Literal,
NamedTuple,
- NoReturn,
Optional,
Protocol,
SupportsIndex,
@@ -42,9 +44,12 @@ from typing import (
overload,
)
from urllib.error import HTTPError
-from urllib.parse import urlparse, urlunparse
+from urllib.parse import ParseResult, urlparse, urlunparse
from urllib.request import Request, urlopen
+# Preserve error in namespace in case a user imported it from PyShp
+StructError = error
+
# Create named logger
logger = logging.getLogger(__name__)
@@ -1135,7 +1140,7 @@ class _CanHaveBBox(Shape):
raise ShapefileException(f"Four numbers required for bbox. Got: {bbox}")
try:
return b_io.write(pack("<4d", *bbox))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write bounding box for record {i}. Expected floats."
)
@@ -1165,7 +1170,7 @@ class _CanHaveBBox(Shape):
x_ys.extend(point[:2])
try:
return b_io.write(pack(f"<{len(x_ys)}d", *x_ys))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write points for record {i}. Expected floats."
)
@@ -1335,7 +1340,7 @@ class Point(Shape):
) -> int:
try:
return b_io.write(pack("<2d", x, y))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write point for record {i}. Expected floats."
)
@@ -1519,7 +1524,7 @@ class _HasM(_CanHaveBBox):
# Note: missing m values are autoset to NODATA.
try:
num_bytes_written = b_io.write(pack("<2d", *mbox))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write measure extremes for record {i}. Expected floats"
)
@@ -1529,7 +1534,7 @@ class _HasM(_CanHaveBBox):
ms_to_encode = [m if m is not None else NODATA for m in ms]
num_bytes_written += b_io.write(pack(f"<{len(ms)}d", *ms_to_encode))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write measure values for record {i}. Expected floats"
)
@@ -1569,14 +1574,14 @@ class _HasZ(_CanHaveBBox):
# Note: missing z values are autoset to 0, but not sure if this is ideal.
try:
num_bytes_written = b_io.write(pack("<2d", *zbox))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write elevation extremes for record {i}. Expected floats."
)
try:
zs = cast(_HasZ, s).z
num_bytes_written += b_io.write(pack(f"<{len(zs)}d", *zs))
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write elevation values for record {i}. Expected floats."
)
@@ -1644,7 +1649,7 @@ class PointM(Point):
self,
x: float,
y: float,
- # same default as in Writer.__shpRecord (if s.shapeType in (11, 21):)
+ # same default as in Writer._shp_record (if s.shapeType in (11, 21):)
# PyShp encodes None m values as NODATA
m: float | None = None,
oid: int | None = None,
@@ -1672,7 +1677,7 @@ class PointM(Point):
try:
s = cast(_HasM, s)
m = s.m[0] if s.m else None
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write measure value for record {i}. Expected floats."
)
@@ -1802,7 +1807,7 @@ class PointZ(PointM):
):
Shape.__init__(self, points=[(x, y)], z=(z,), m=(m,), oid=oid)
- # same default as in Writer.__shpRecord (if s.shapeType == 11:)
+ # same default as in Writer._shp_record (if s.shapeType == 11:)
z: Sequence[float] = (0.0,)
@staticmethod
@@ -1820,7 +1825,7 @@ class PointZ(PointM):
try:
if s.z:
z = s.z[0]
- except error:
+ except StructError:
raise ShapefileException(
f"Failed to write elevation value for record {i}. Expected floats."
)
@@ -2195,965 +2200,1106 @@ class ShapeRecords(list[ShapeRecord]):
)
-class ShapefileException(Exception):
- """An exception to handle shapefile specific problems."""
+def _save_to_named_tmp_file(
+ bytes_stream: ReadableBinStream,
+ initial_bytes: bytes = b"",
+ suffix: str | None = None,
+) -> tempfile._TemporaryFileWrapper[bytes]:
+ """Write stream to a read+write tempfile.
+ Gets deleted when garbage collected.
+ """
+ tmp_file_obj = tempfile.NamedTemporaryFile(mode="w+b", suffix=suffix, delete=True)
+ if initial_bytes:
+ tmp_file_obj.write(initial_bytes)
+ tmp_file_obj.write(bytes_stream.read())
+ tmp_file_obj.seek(0)
+ return tmp_file_obj
+
+
+HTML_SIGNATURES_UC = (
+ b"<!DOCTYPE",
+ b"<HTML",
+ b"<HEAD",
+ b"<BODY",
+)
-class _NoShpSentinel:
- """For use as a default value for shp to preserve the
- behaviour (from when all keyword args were gathered
- in the **kwargs dict) in case someone explictly
- called Reader(shp=None) to load self.shx.
- """
+class UnsuccessfulFileDownload(Warning): ...
-_NO_SHP_SENTINEL = _NoShpSentinel()
+SUPPORTED_URL_SCHEMES = ("http", "https") # must be lower case
+DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36"
-class Reader:
- """Reads the three files of a shapefile as a unit or
- separately. If one of the three files (.shp, .shx,
- .dbf) is missing no exception is thrown until you try
- to call a method that depends on that particular file.
- The .shx index file is used if available for efficiency
- but is not required to read the geometry from the .shp
- file. The "shapefile" argument in the constructor is the
- name of the file you want to open, and can be the path
- to a shapefile on a local filesystem, inside a zipfile,
- or a url.
+ at overload
+def _try_to_download_binary_file(
+ urlinfo: ParseResult,
+) -> tuple[bytes, ReadableBinStream]: ...
+ at overload
+def _try_to_download_binary_file(
+ urlinfo: ParseResult,
+ ext: str | None,
+ suppress_http_errors: bool,
+ user_agent: str,
+) -> tuple[bytes, ReadableBinStream | None]: ...
+ at overload
+def _try_to_download_binary_file(
+ urlinfo: ParseResult,
+ ext: str | None,
+ suppress_http_errors: bool,
+) -> tuple[bytes, ReadableBinStream | None]: ...
+def _try_to_download_binary_file(
+ urlinfo: ParseResult,
+ ext: str | None = None,
+ suppress_http_errors: bool = False,
+ user_agent: str = DEFAULT_USER_AGENT,
+) -> tuple[bytes, ReadableBinStream | None]:
+ """Tries to open a parsed url and download a file served from it.
+ Warns if Content-Type is html and if bytes look like html
+ """
- You can instantiate a Reader without specifying a shapefile
- and then specify one later with the load() method.
+ if ext is not None:
+ urlpath, _ = os.path.splitext(
+ urlinfo.path
+ ) # Removes e.g. ".shp", including the "."
+ urlinfo = urlinfo._replace(path=f"{urlpath}.{ext}")
+
+ url = urlunparse(urlinfo)
+
+ req = Request(
+ url,
+ headers={
+ "User-agent": user_agent,
+ },
+ # Don't enforce method="GET", let urllib pick
+ # whichever defaults it thinks are best,
+ # to allow possible future
+ # support for shapefiles via ftp or on local network addresses.
+ )
- Only the shapefile headers are read upon loading. Content
- within each file is only accessed when required and as
- efficiently as possible. Shapefiles are usually not large
- but they can be.
+ try:
+ resp = urlopen(req)
+ except HTTPError as e:
+ msg = f"{e.msg}, {e.code} occurred when trying to open: {url}, reason: {e.reason}. "
+ if not suppress_http_errors:
+ e.msg = msg # Add helpful info to the default abrupt 404 message.
+ raise e
+ elif ext != ".shx":
+ # Technically the .shx is required for an ESRI Shapefile,
+ # but it's not needed for PyShp, it only contains indices of shapes.
+ warnings.warn(msg, category=UnsuccessfulFileDownload)
+ return b"", None
+
+ content_type = resp.headers.get("Content-Type", "")
+ if "text/html" in content_type:
+ msg = f"Server returned HTML Content-Type: {content_type})"
+
+ # It is preferable not to add special cases for every possible
+ # hosting service, but Github is a frequent source of frustration
+ # in our own tests, and there has literally been an issue open for
+ # over a year to locate a shapefile downloadable from elsewhere
+ # that nobody has yet answered. So if someone requests support
+ # for another service hosting a public shapefile, at least that
+ # issue can finally be closed (and James can delete his Github
+ # test data repo).
+ if urlinfo.netloc.lower().endswith("github.com"):
+ msg = f'{msg}\nAppend "?raw=true" after the file name to download from Github repos. '
+ warnings.warn(msg, category=UnsuccessfulFileDownload)
+ return b"", None
+
+ initial_bytes = resp.read(40)
+ if initial_bytes.upper().startswith(HTML_SIGNATURES_UC):
+ msg = f"Response body appears to be HTML despite Content-Type: '{content_type}'"
+ warnings.warn(msg, category=UnsuccessfulFileDownload)
+
+ # All PyShp cares about is that the response has a .read method
+ # that returns bytes. But at the cost of importing http.client
+ # we could type this stricter as tuple[bytes, HTTPResponse]:
+ # "For HTTP and HTTPS URLs, this function returns a
+ # http.client.HTTPResponse object slightly modified."
+ return initial_bytes, cast(ReadableBinStream, resp)
+
+
+def _try_get_open_constituent_file(
+ shapefile_name: str,
+ ext: Literal["shp", "shx", "dbf"],
+) -> IO[bytes] | None:
"""
+ Attempts to open a .shp, .dbf or .shx file,
+ with both lower case and upper case file extensions,
+ and return it. If it was not possible to open the file, None is returned.
+ """
+ # typing.LiteralString is only available from Python 3.11 onwards.
+ # https://docs.python.org/3/library/typing.html#typing.LiteralString
+ # assert ext in {'shp', 'dbf', 'shx'}
+
+ exts = {ext, ext.upper(), ext.lower()}
+
+ for candidate_ext in exts:
+ try:
+ return open(f"{shapefile_name}.{candidate_ext}", "rb")
+ except OSError:
+ pass
+ return None
- CONSTITUENT_FILE_EXTS = ["shp", "shx", "dbf"]
- assert all(ext.islower() for ext in CONSTITUENT_FILE_EXTS)
- def _assert_ext_is_supported(self, ext: str) -> None:
- assert ext in self.CONSTITUENT_FILE_EXTS
+def ensure_within_bounds(i: int, num_records: int) -> int:
+ """Provides list-like handling of a record index with a clearer
+ error message if the index is out of bounds."""
+ rmax = num_records - 1
+ if abs(i) > rmax:
+ raise IndexError(f"Shape or Record index: {i} out of range. Max index: {rmax}")
+ if i < 0:
+ i = range(num_records)[i]
+ return i
+
+
+class DbfReader:
+ """Reads a dbf file. You can instantiate a DbfReader without specifying a shapefile
+ and then specify one later with the load() method.
+ """
def __init__(
self,
- shapefile_path: str | PathLike[Any] = "",
- /,
*,
+ file_obj: IO[bytes],
encoding: str = "utf-8",
encodingErrors: str = "strict",
- shp: _NoShpSentinel | BinaryFileT | None = _NO_SHP_SENTINEL,
- shx: BinaryFileT | None = None,
- dbf: BinaryFileT | None = None,
- # Keep kwargs even though unused, to preserve PyShp 2.4 API
- **kwargs: Any,
):
- self.shp = None
- self.shx = None
- self.dbf = None
- self._files_to_close: list[BinaryFileStreamT] = []
- self.shapeName = "Not specified"
- self._offsets: list[int] = []
- self.shpLength: int | None = None
- self.numRecords: int | None = None
- self.numShapes: int | None = None
+ self._file = file_obj
self.fields: list[Field] = []
- self.__dbfHdrLength = 0
self.__fieldLookup: dict[str, int] = {}
self.encoding = encoding
self.encodingErrors = encodingErrors
- # See if a shapefile name was passed as the first argument
- if shapefile_path:
- path = fsdecode_if_pathlike(shapefile_path)
- if isinstance(path, str):
- if ".zip" in path:
- # Shapefile is inside a zipfile
- if path.count(".zip") > 1:
- # Multiple nested zipfiles
- raise ShapefileException(
- f"Reading from multiple nested zipfiles is not supported: {path}"
- )
- # Split into zipfile and shapefile paths
- if path.endswith(".zip"):
- zpath = path
- shapefile = None
- else:
- zpath = path[: path.find(".zip") + 4]
- shapefile = path[path.find(".zip") + 4 + 1 :]
- zipfileobj: (
- tempfile._TemporaryFileWrapper[bytes] | io.BufferedReader
- )
- # Create a zip file handle
- if zpath.startswith("http"):
- # Zipfile is from a url
- # Download to a temporary url and treat as normal zipfile
- req = Request(
- zpath,
- headers={
- "User-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36"
- },
- )
- resp = urlopen(req)
- # write zipfile data to a read+write tempfile and use as source, gets deleted when garbage collected
- zipfileobj = tempfile.NamedTemporaryFile(
- mode="w+b", suffix=".zip", delete=True
- )
- zipfileobj.write(resp.read())
- zipfileobj.seek(0)
- else:
- # Zipfile is from a file
- zipfileobj = open(zpath, mode="rb")
- # Open the zipfile archive
- with zipfile.ZipFile(zipfileobj, "r") as archive:
- if not shapefile:
- # Only the zipfile path is given
- # Inspect zipfile contents to find the full shapefile path
- shapefiles = [
- name
- for name in archive.namelist()
- if (name.endswith(".SHP") or name.endswith(".shp"))
- ]
- # The zipfile must contain exactly one shapefile
- if len(shapefiles) == 0:
- raise ShapefileException(
- "Zipfile does not contain any shapefiles"
- )
- if len(shapefiles) == 1:
- shapefile = shapefiles[0]
- else:
- raise ShapefileException(
- f"Zipfile contains more than one shapefile: {shapefiles}. "
- "Please specify the full path to the shapefile you would like to open."
- )
- # Try to extract file-like objects from zipfile
- shapefile = os.path.splitext(shapefile)[
- 0
- ] # root shapefile name
- for lower_ext in self.CONSTITUENT_FILE_EXTS:
- for cased_ext in [lower_ext, lower_ext.upper()]:
- try:
- member = archive.open(f"{shapefile}.{cased_ext}")
- # write zipfile member data to a read+write tempfile and use as source, gets deleted on close()
- fileobj = tempfile.NamedTemporaryFile(
- mode="w+b", delete=True
- )
- fileobj.write(member.read())
- fileobj.seek(0)
- setattr(self, lower_ext, fileobj)
- self._files_to_close.append(fileobj)
- except (OSError, AttributeError, KeyError):
- pass
- # Close and delete the temporary zipfile
- try:
- zipfileobj.close()
- # TODO Does catching all possible exceptions really increase
- # the chances of closing the zipfile successully, or does it
- # just mean .close() failures will still fail, but fail
- # silently?
- except: # noqa: E722
- pass
- # Try to load shapefile
- if self.shp or self.dbf:
- # Load and exit early
- self.load()
- return
+ self._dbfHeader()
- raise ShapefileException(
- f"No shp or dbf file found in zipfile: {path}"
- )
+ @property
+ def dbf(self) -> IO[bytes]:
+ if not self._file:
+ raise dbfFileException(
+ f"DbfReader requires a .dbf file or file-like object. Got: {self._file}"
+ )
+ return self._file
- if path.startswith("http"):
- # Shapefile is from a url
- # Download each file to temporary path and treat as normal shapefile path
- urlinfo = urlparse(path)
- urlpath = urlinfo[2]
- urlpath, _ = os.path.splitext(urlpath)
- shapefile = os.path.basename(urlpath)
- for ext in ["shp", "shx", "dbf"]:
- try:
- _urlinfo = list(urlinfo)
- _urlinfo[2] = urlpath + "." + ext
- _path = urlunparse(_urlinfo)
- req = Request(
- _path,
- headers={
- "User-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36"
- },
- )
- resp = urlopen(req)
- # write url data to a read+write tempfile and use as source, gets deleted on close()
- fileobj = tempfile.NamedTemporaryFile(
- mode="w+b", delete=True
- )
- fileobj.write(resp.read())
- fileobj.seek(0)
- setattr(self, ext, fileobj)
- self._files_to_close.append(fileobj)
- except HTTPError:
- pass
- if self.shp or self.dbf:
- # Load and exit early
- self.load()
- return
-
- raise ShapefileException(f"No shp or dbf file found at url: {path}")
+ def __len__(self) -> int:
+ """Returns the number of records in the .dbf file."""
- # Local file path to a shapefile
- # Load and exit early
- self.load(path)
- return
+ return self.numRecords
- if shp is not _NO_SHP_SENTINEL:
- shp = cast(Union[str, PathLike[Any], IO[bytes], None], shp)
- self.shp = self.__seek_0_on_file_obj_wrap_or_open_from_name("shp", shp)
- self.shx = self.__seek_0_on_file_obj_wrap_or_open_from_name("shx", shx)
+ def _dbfHeader(self) -> None:
+ """Reads a dbf header. Xbase-related code borrows heavily from ActiveState Python Cookbook Recipe 362715 by Raymond Hettinger"""
- self.dbf = self.__seek_0_on_file_obj_wrap_or_open_from_name("dbf", dbf)
+ dbf = self.dbf
+ # read relevant header parts
+ dbf.seek(0)
+ self.numRecords, self.__dbfHdrLength, self._record_length = cast(
+ tuple[int, int, int], unpack("<xxxxLHH20x", dbf.read(32))
+ )
- # Load the files
- if self.shp or self.dbf:
- self._try_to_set_constituent_file_headers()
+ # read fields
+ numFields = (self.__dbfHdrLength - 33) // 32
+ for __field in range(numFields):
+ encoded_field_tuple: tuple[bytes, bytes, int, int] = unpack(
+ "<11sc4xBB14x", dbf.read(32)
+ )
+ encoded_name, encoded_type_char, size, decimal = encoded_field_tuple
- def __seek_0_on_file_obj_wrap_or_open_from_name(
- self,
- ext: str,
- file_: BinaryFileT | None,
- ) -> None | IO[bytes]:
- # assert ext in {'shp', 'dbf', 'shx'}
- self._assert_ext_is_supported(ext)
+ if b"\x00" in encoded_name:
+ idx = encoded_name.index(b"\x00")
+ else:
+ idx = len(encoded_name) - 1
+ encoded_name = encoded_name[:idx]
+ name = encoded_name.decode(self.encoding, self.encodingErrors)
+ name = name.lstrip()
- if file_ is None:
- return None
+ field_type = FIELD_TYPE_ALIASES[encoded_type_char]
- if isinstance(file_, (str, PathLike)):
- baseName, __ = os.path.splitext(file_)
- return self._load_constituent_file(baseName, ext)
+ self.fields.append(Field(name, field_type, size, decimal))
+ terminator = dbf.read(1)
+ if terminator != b"\r":
+ raise ShapefileException(
+ "Shapefile dbf header lacks expected terminator. (likely corrupt?)"
+ )
- if hasattr(file_, "read"):
- # Copy if required
- try:
- file_.seek(0)
- return file_
- except (NameError, io.UnsupportedOperation):
- return io.BytesIO(file_.read())
+ # insert deletion field at start
+ self.fields.insert(0, Field("DeletionFlag", FieldType.C, 1, 0))
- raise ShapefileException(
- f"Could not load shapefile constituent file from: {file_}"
- )
+ # store all field positions for easy lookups
+ # note: fieldLookup gives the index position of a field inside Reader.fields
+ self.__fieldLookup = {f[0]: i for i, f in enumerate(self.fields)}
- def __str__(self) -> str:
- """
- Use some general info on the shapefile as __str__
- """
- info = ["shapefile Reader"]
- if self.shp:
- info.append(
- f" {len(self)} shapes (type '{SHAPETYPE_LOOKUP[self.shapeType]}')"
- )
- if self.dbf:
- info.append(f" {len(self)} records ({len(self.fields)} fields)")
- return "\n".join(info)
+ # by default, read all fields except the deletion flag, hence "[1:]"
+ # note: recLookup gives the index position of a field inside a _Record list
+ fieldnames = [f[0] for f in self.fields[1:]]
+ __fieldTuples, recLookup, recStruct = self._record_fields(fieldnames)
+ self.__fullRecStruct = recStruct
+ self.__fullRecLookup = recLookup
- def __enter__(self) -> Reader:
+ def _record_fmt(self, fields: Container[str] | None = None) -> tuple[str, int]:
+ """Calculates the format and size of a .dbf record. Optional 'fields' arg
+ specifies which fieldnames to unpack and which to ignore. Note that this
+ always includes the DeletionFlag at index 0, regardless of the 'fields' arg.
"""
- Enter phase of context manager.
+ structcodes = [f"{fieldinfo.size}s" for fieldinfo in self.fields]
+ if fields is not None:
+ # only unpack specified fields, ignore others using padbytes (x)
+ structcodes = [
+ code
+ if fieldinfo.name in fields
+ or fieldinfo.name == "DeletionFlag" # always unpack delflag
+ else f"{fieldinfo.size}x"
+ for fieldinfo, code in zip(self.fields, structcodes)
+ ]
+ fmt = "".join(structcodes)
+ fmt_size = calcsize(fmt)
+ # total size of fields should add up to recordlength from the header
+ while fmt_size < self._record_length:
+ # if not, pad byte until reaches recordlength
+ fmt += "x"
+ fmt_size += 1
+ return (fmt, fmt_size)
+
+ def _record_fields(
+ self, fields: Iterable[str] | None = None
+ ) -> tuple[list[Field], dict[str, int], Struct]:
+ """Returns the necessary info required to unpack a record's fields,
+ restricted to a subset of fieldnames 'fields' if specified.
+ Returns a list of field info tuples, a name-index lookup dict,
+ and a Struct instance for unpacking these fields. Note that DeletionFlag
+ is not a valid field.
"""
- return self
+ if fields is not None:
+ # restrict info to the specified fields
+ # first ignore repeated field names (order doesn't matter)
+ unique_fields = list(set(fields))
+ # get the struct
+ fmt, __fmt_size = self._record_fmt(fields=unique_fields)
+ recStruct = Struct(fmt)
+ # make sure the given fieldnames exist
+ for name in unique_fields:
+ if name not in self.__fieldLookup or name == "DeletionFlag":
+ raise ValueError(f'"{name}" is not a valid field name')
+ # fetch relevant field info tuples
+ fieldTuples = []
+ for fieldinfo in self.fields[1:]:
+ name = fieldinfo[0]
+ if name in unique_fields:
+ fieldTuples.append(fieldinfo)
+ # store the field positions
+ recLookup = {f[0]: i for i, f in enumerate(fieldTuples)}
+ else:
+ # use all the dbf fields
+ fieldTuples = self.fields[1:] # sans deletion flag
+ recStruct = self.__fullRecStruct
+ recLookup = self.__fullRecLookup
+ return fieldTuples, recLookup, recStruct
- # def __exit__(self, exc_type, exc_val, exc_tb) -> None:
- def __exit__(
+ def _record(
self,
- exc_type: BaseException | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> bool | None:
- """
- Exit phase of context manager, close opened files.
+ fieldTuples: list[Field],
+ recLookup: dict[str, int],
+ recStruct: Struct,
+ oid: int | None = None,
+ ) -> _Record | None:
+ """Reads and returns a dbf record row as a list of values. Requires specifying
+ a list of field info Field namedtuples 'fieldTuples', a record name-index dict 'recLookup',
+ and a Struct instance 'recStruct' for unpacking these fields.
"""
- self.close()
- return None
-
- def __len__(self) -> int:
- """Returns the number of shapes/records in the shapefile."""
- if self.dbf:
- # Preferably use dbf record count
- if self.numRecords is None:
- self.__dbfHeader()
+ f = self.dbf
- # .__dbfHeader sets self.numRecords or raises Exception
- return cast(int, self.numRecords)
+ # The only format chars in from self._record_fmt, in recStruct from _record_fields,
+ # are s and x (ascii encoded str and pad byte) so everything in recordContents is bytes
+ # https://docs.python.org/3/library/struct.html#format-characters
+ recordContents = recStruct.unpack(f.read(recStruct.size))
- if self.shp:
- # Otherwise use shape count
- if self.shx:
- if self.numShapes is None:
- self.__shxHeader()
+ # deletion flag field is always unpacked as first value (see _record_fmt)
+ if recordContents[0] != b" ":
+ # deleted record
+ return None
- # .__shxHeader sets self.numShapes or raises Exception
- return cast(int, self.numShapes)
+ # drop deletion flag from values
+ recordContents = recordContents[1:]
- # Index file not available, iterate all shapes to get total count
- if self.numShapes is None:
- # Determine length of shp file
- shp = self.shp
- checkpoint = shp.tell()
- shp.seek(0, 2)
- shpLength = shp.tell()
- shp.seek(100)
- # Do a fast shape iteration until end of file.
- offsets = []
- pos = shp.tell()
- while pos < shpLength:
- offsets.append(pos)
- # Unpack the shape header only
- (__recNum, recLength) = unpack_2_int32_be(shp.read(8))
- # Jump to next shape position
- pos += 8 + (2 * recLength)
- shp.seek(pos)
- # Set numShapes and offset indices
- self.numShapes = len(offsets)
- self._offsets = offsets
- # Return to previous file position
- shp.seek(checkpoint)
+ # check that values match fields
+ if len(fieldTuples) != len(recordContents):
+ raise ShapefileException(
+ f"Number of record values ({len(recordContents)}) is different from the requested "
+ f"number of fields ({len(fieldTuples)})"
+ )
- return self.numShapes
-
- # No file loaded yet, treat as 'empty' shapefile
- return 0
-
- def __iter__(self) -> Iterator[ShapeRecord]:
- """Iterates through the shapes/records in the shapefile."""
- yield from self.iterShapeRecords()
-
- @property
- def __geo_interface__(self) -> GeoJSONFeatureCollectionWithBBox:
- shaperecords = self.shapeRecords()
- fcollection = GeoJSONFeatureCollectionWithBBox(
- bbox=list(self.bbox),
- **shaperecords.__geo_interface__,
- )
- return fcollection
-
- @property
- def shapeTypeName(self) -> str:
- return SHAPETYPE_LOOKUP[self.shapeType]
-
- def load(self, shapefile: str | None = None) -> None:
- """Opens a shapefile from a filename or file-like
- object. Normally this method would be called by the
- constructor with the file name as an argument."""
- if shapefile:
- (shapeName, __ext) = os.path.splitext(shapefile)
- self.shapeName = shapeName
- self.load_shp(shapeName)
- self.load_shx(shapeName)
- self.load_dbf(shapeName)
- if not (self.shp or self.dbf):
- raise ShapefileException(
- f"Unable to open {shapeName}.dbf or {shapeName}.shp."
- )
- self._try_to_set_constituent_file_headers()
+ # parse each value
+ record = []
+ for (__name, typ, __size, decimal), value in zip(fieldTuples, recordContents):
+ if typ is FieldType.N or typ is FieldType.F:
+ # numeric or float: number stored as a string, right justified, and padded with blanks to the width of the field.
+ value = value.split(b"\0")[0]
+ value = value.replace(b"*", b"") # QGIS NULL is all '*' chars
+ if value == b"":
+ value = None
+ elif decimal:
+ try:
+ value = float(value)
+ except ValueError:
+ # not parseable as float, set to None
+ value = None
+ else:
+ # force to int
+ try:
+ # first try to force directly to int.
+ # forcing a large int to float and back to int
+ # will lose information and result in wrong nr.
+ value = int(value)
+ except ValueError:
+ # forcing directly to int failed, so was probably a float.
+ try:
+ value = int(float(value))
+ except ValueError:
+ # not parseable as int, set to None
+ value = None
+ elif typ is FieldType.D:
+ # date: 8 bytes - date stored as a string in the format YYYYMMDD.
+ if (
+ not value.replace(b"\x00", b"")
+ .replace(b" ", b"")
+ .replace(b"0", b"")
+ ):
+ # dbf date field has no official null value
+ # but can check for all hex null-chars, all spaces, or all 0s (QGIS null)
+ value = None
+ else:
+ try:
+ # return as python date object
+ y, m, d = int(value[:4]), int(value[4:6]), int(value[6:8])
+ value = date(y, m, d)
+ except (TypeError, ValueError):
+ # if invalid date, just return as unicode string so user can decimalde
+ value = str(value.strip())
+ elif typ is FieldType.L:
+ # logical: 1 byte - initialized to 0x20 (space) otherwise T or F.
+ if value == b" ":
+ value = None # space means missing or not yet set
+ else:
+ if value in b"YyTt1":
+ value = True
+ elif value in b"NnFf0":
+ value = False
+ else:
+ value = None # unknown value is set to missing
+ else:
+ value = value.decode(self.encoding, self.encodingErrors)
+ value = value.strip().rstrip(
+ "\x00"
+ ) # remove null-padding at end of strings
+ record.append(value)
- def _try_to_set_constituent_file_headers(self) -> None:
- if self.shp:
- self.__shpHeader()
- if self.dbf:
- self.__dbfHeader()
- if self.shx:
- self.__shxHeader()
+ return _Record(recLookup, record, oid)
- def _try_get_open_constituent_file(
- self,
- shapefile_name: str,
- ext: str,
- ) -> IO[bytes] | None:
- """
- Attempts to open a .shp, .dbf or .shx file,
- with both lower case and upper case file extensions,
- and return it. If it was not possible to open the file, None is returned.
+ def record(self, i: int = 0, fields: list[str] | None = None) -> _Record | None:
+ """Returns a specific dbf record based on the supplied index.
+ To only read some of the fields, specify the 'fields' arg as a
+ list of one or more fieldnames.
"""
- # typing.LiteralString is only available from PYthon 3.11 onwards.
- # https://docs.python.org/3/library/typing.html#typing.LiteralString
- # assert ext in {'shp', 'dbf', 'shx'}
- self._assert_ext_is_supported(ext)
+ f = self.dbf
- try:
- return open(f"{shapefile_name}.{ext}", "rb")
- except OSError:
- try:
- return open(f"{shapefile_name}.{ext.upper()}", "rb")
- except OSError:
- return None
+ i = ensure_within_bounds(i, self.numRecords)
+ recSize = self._record_length
+ f.seek(0)
+ f.seek(self.__dbfHdrLength + (i * recSize))
+ fieldTuples, recLookup, recStruct = self._record_fields(fields)
+ return self._record(
+ oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
+ )
- def _load_constituent_file(
- self,
- shapefile_name: str,
- ext: str,
- ) -> IO[bytes] | None:
- """
- Attempts to open a .shp, .dbf or .shx file, with the extension
- as both lower and upper case, and if successful append it to
- self._files_to_close.
+ def records(self, fields: list[str] | None = None) -> list[_Record]:
+ """Returns all records in a dbf file.
+ To only read some of the fields, specify the 'fields' arg as a
+ list of one or more fieldnames.
"""
- shp_dbf_or_dhx_file = self._try_get_open_constituent_file(shapefile_name, ext)
- if shp_dbf_or_dhx_file is not None:
- self._files_to_close.append(shp_dbf_or_dhx_file)
- return shp_dbf_or_dhx_file
- def load_shp(self, shapefile_name: str) -> None:
- """
- Attempts to load file with .shp extension as both lower and upper case
- """
- self.shp = self._load_constituent_file(shapefile_name, "shp")
+ records = []
+ self.dbf.seek(self.__dbfHdrLength)
+ fieldTuples, recLookup, recStruct = self._record_fields(fields)
- def load_shx(self, shapefile_name: str) -> None:
- """
- Attempts to load file with .shx extension as both lower and upper case
- """
- self.shx = self._load_constituent_file(shapefile_name, "shx")
+ for i in range(self.numRecords):
+ r = self._record(
+ oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
+ )
+ if r:
+ records.append(r)
+ return records
- def load_dbf(self, shapefile_name: str) -> None:
- """
- Attempts to load file with .dbf extension as both lower and upper case
+ def iterRecords(
+ self,
+ fields: list[str] | None = None,
+ start: int = 0,
+ stop: int | None = None,
+ ) -> Iterator[_Record | None]:
+ """Returns a generator of records in a dbf file.
+ Useful for large shapefiles or dbf files.
+ To only read some of the fields, specify the 'fields' arg as a
+ list of one or more fieldnames.
+ By default yields all records. Otherwise, specify start
+ (default: 0) or stop (default: number_of_records)
+ to only yield record numbers i, where
+ start <= i < stop, (or
+ start <= i < number_of_records + stop
+ if stop < 0).
"""
- self.dbf = self._load_constituent_file(shapefile_name, "dbf")
-
- def __del__(self) -> None:
- self.close()
- def close(self) -> None:
- # Close any files that the reader opened (but not those given by user)
- for attribute in self._files_to_close:
- if hasattr(attribute, "close"):
- try:
- attribute.close()
- except OSError:
- pass
- self._files_to_close = []
-
- def __getFileObj(self, f: T | None) -> T:
- """Checks to see if the requested shapefile file object is
- available. If not a ShapefileException is raised."""
- if not f:
+ if not isinstance(self.numRecords, int):
raise ShapefileException(
- "Shapefile Reader requires a shapefile or file-like object."
+ "Error when reading number of Records in dbf file header"
)
- if self.shp and self.shpLength is None:
- self.load()
- if self.dbf and len(self.fields) == 0:
- self.load()
- return f
-
- def __restrictIndex(self, i: int) -> int:
- """Provides list-like handling of a record index with a clearer
- error message if the index is out of bounds."""
- if self.numRecords:
- rmax = self.numRecords - 1
- if abs(i) > rmax:
- raise IndexError(
- f"Shape or Record index: {i} out of range. Max index: {rmax}"
- )
- if i < 0:
- i = range(self.numRecords)[i]
- return i
-
- def __shpHeader(self) -> None:
- """Reads the header information from a .shp file."""
- if not self.shp:
- raise ShapefileException(
- "Shapefile Reader requires a shapefile or file-like object. (no shp file found"
+ start = ensure_within_bounds(start, self.numRecords)
+ if stop is None:
+ stop = self.numRecords
+ elif abs(stop) > self.numRecords:
+ raise IndexError(
+ f"abs(stop): {abs(stop)} exceeds number of records: {self.numRecords}."
)
+ elif stop < 0:
+ stop = range(self.numRecords)[stop]
+ recSize = self._record_length
+ self.dbf.seek(self.__dbfHdrLength + (start * recSize))
+ fieldTuples, recLookup, recStruct = self._record_fields(fields)
+ for i in range(start, stop):
+ r = self._record(
+ oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
+ )
+ if r:
+ yield r
- shp = self.shp
- # File length (16-bit word * 2 = bytes)
- shp.seek(24)
- self.shpLength = unpack(">i", shp.read(4))[0] * 2
- # Shape type
- shp.seek(32)
- self.shapeType = unpack("<i", shp.read(4))[0]
- # The shapefile's bounding box (lower left, upper right)
- # self.bbox: BBox = tuple(_Array("d", unpack("<4d", shp.read(32))))
- self.bbox: BBox = unpack("<4d", shp.read(32))
- # xmin, ymin, xmax, ymax = unpack("<4d", shp.read(32))
- # self.bbox = BBox(xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax)
- # Elevation
- # self.zbox: ZBox = tuple(_Array("d", unpack("<2d", shp.read(16))))
- self.zbox: ZBox = unpack("<2d", shp.read(16))
- # zmin, zmax = unpack("<2d", shp.read(16))
- # self.zbox = ZBox(zmin=zmin, zmax=zmax)
- # Measure
- # Measure values less than -10e38 are nodata values according to the spec
- m_bounds = [
- float(m_bound) if m_bound >= NODATA else None
- for m_bound in unpack("<2d", shp.read(16))
- ]
- # self.mbox = MBox(mmin=m_bounds[0], mmax=m_bounds[1])
- self.mbox: tuple[float | None, float | None] = (m_bounds[0], m_bounds[1])
-
- def __shape(self, oid: int | None = None, bbox: BBox | None = None) -> Shape | None:
- """Returns the header info and geometry for a single shape."""
- f = self.__getFileObj(self.shp)
+class ShapefileException(Exception):
+ """An exception to handle shapefile specific problems."""
- # shape = Shape(oid=oid)
- (__recNum, recLength) = unpack_2_int32_be(f.read(8))
- # Determine the start of the next record
- # Convert from num of 16 bit words, to 8 bit bytes
- recLength_bytes = 2 * recLength
+class dbfFileException(ShapefileException):
+ """Indicates a problem with the .dbf file."""
- # next_shape = f.tell() + recLength_bytes
- # Read entire record into memory to avoid having to call
- # seek on the file afterwards
- b_io: ReadSeekableBinStream = io.BytesIO(f.read(recLength_bytes))
- b_io.seek(0)
+class _NoShpSentinel:
+ """For use as a default value for shp to preserve the
+ behaviour (from when all keyword args were gathered
+ in the **kwargs dict) in case someone explictly
+ called Reader(shp=None) to load self.shx.
+ """
- shapeType = unpack("<i", b_io.read(4))[0]
- ShapeClass = SHAPE_CLASS_FROM_SHAPETYPE[shapeType]
- shape = ShapeClass.from_byte_stream(
- shapeType, b_io, recLength_bytes, oid=oid, bbox=bbox
- )
+_NO_SHP_SENTINEL = _NoShpSentinel()
- # Seek to the end of this record as defined by the record header because
- # the shapefile spec doesn't require the actual content to meet the header
- # definition. Probably allowed for lazy feature deletion.
- # f.seek(next_shape)
- return shape
+class Reader:
+ """Reads the three files of a shapefile as a unit or
+ separately. If one of the three files (.shp, .shx,
+ .dbf) is missing no exception is thrown until you try
+ to call a method that depends on that particular file.
+ The .shx index file is used if available for efficiency
+ but is not required to read the geometry from the .shp
+ file. The "shapefile" argument in the constructor is the
+ name of the file you want to open, and can be the path
+ to a shapefile on a local filesystem, inside a zipfile,
+ or a url.
- def __shxHeader(self) -> None:
- """Reads the header information from a .shx file."""
- shx = self.shx
- if not shx:
- raise ShapefileException(
- "Shapefile Reader requires a shapefile or file-like object. (no shx file found"
- )
- # File length (16-bit word * 2 = bytes) - header length
- shx.seek(24)
- shxRecordLength = (unpack(">i", shx.read(4))[0] * 2) - 100
- self.numShapes = shxRecordLength // 8
+ You can instantiate a Reader without specifying a shapefile
+ and then specify one later with the load() method.
- def __shxOffsets(self) -> None:
- """Reads the shape offset positions from a .shx file"""
- shx = self.shx
- if not shx:
- raise ShapefileException(
- "Shapefile Reader requires a shapefile or file-like object. (no shx file found"
- )
- if self.numShapes is None:
- raise ShapefileException(
- "numShapes must not be None. "
- " Was there a problem with .__shxHeader() ?"
- f"Got: {self.numShapes=}"
- )
- # Jump to the first record.
- shx.seek(100)
- # Each index record consists of two nrs, we only want the first one
- shxRecords = _Array[int]("i", shx.read(2 * self.numShapes * 4))
- if sys.byteorder != "big":
- shxRecords.byteswap()
- self._offsets = [2 * el for el in shxRecords[::2]]
-
- def __shapeIndex(self, i: int | None = None) -> int | None:
- """Returns the offset in a .shp file for a shape based on information
- in the .shx index file."""
- shx = self.shx
- # Return None if no shx or no index requested
- if not shx or i is None:
- return None
- # At this point, we know the shx file exists
- if not self._offsets:
- self.__shxOffsets()
- return self._offsets[i]
+ Only the shapefile headers are read upon loading. Content
+ within each file is only accessed when required and as
+ efficiently as possible. Shapefiles are usually not large
+ but they can be.
+ """
- def shape(self, i: int = 0, bbox: BBox | None = None) -> Shape | None:
- """Returns a shape object for a shape in the geometry
- record file.
- If the 'bbox' arg is given (list or tuple of xmin,ymin,xmax,ymax),
- returns None if the shape is not within that region.
- """
- shp = self.__getFileObj(self.shp)
- i = self.__restrictIndex(i)
- offset = self.__shapeIndex(i)
- if not offset:
- # Shx index not available.
- # Determine length of shp file
- shp.seek(0, 2)
- shpLength = shp.tell()
- shp.seek(100)
- # Do a fast shape iteration until the requested index or end of file.
- _i = 0
- offset = shp.tell()
- while offset < shpLength:
- if _i == i:
- # Reached the requested index, exit loop with the offset value
- break
- # Unpack the shape header only
- (__recNum, recLength) = unpack_2_int32_be(shp.read(8))
- # Jump to next shape position
- offset += 8 + (2 * recLength)
- shp.seek(offset)
- _i += 1
- # If the index was not found, it likely means the .shp file is incomplete
- if _i != i:
- raise ShapefileException(
- f"Shape index {i} is out of bounds; the .shp file only contains {_i} shapes"
- )
+ def __init__(
+ self,
+ shapefile_path: str | PathLike[Any] = "",
+ /,
+ *,
+ encoding: str = "utf-8",
+ encodingErrors: str = "strict",
+ shp: _NoShpSentinel | BinaryFileT | None = _NO_SHP_SENTINEL,
+ shx: BinaryFileT | None = None,
+ dbf: BinaryFileT | None = None,
+ # Keep kwargs even though unused, to preserve PyShp 2.4 API
+ **kwargs: Any,
+ ):
+ self.encoding = encoding
+ self.encodingErrors = encodingErrors
+ self._shp = None
+ self._shx = None
+ self._dbf = None
+ self.shapeName = "Not specified"
+ self._offsets: list[int] = []
+ self.shpLength: int | None = None
+ self.numShapes: int | None = None
+ self._exit_stack = ExitStack()
+ # See if a shapefile name was passed as the first argument
+ if shapefile_path:
+ path = fsdecode_if_pathlike(shapefile_path)
+ self.path = path
+ if isinstance(path, str):
+ if ".zip" in path:
+ self._load_from_zip(path)
+ # Raises if not self._shp or self._dbf
+ return
- # Seek to the offset and read the shape
- shp.seek(offset)
- return self.__shape(oid=i, bbox=bbox)
+ if path.lower().startswith(SUPPORTED_URL_SCHEMES):
+ self._load_from_url(path)
+ # Raises if not self._shp or self._dbf
+ return
- def shapes(self, bbox: BBox | None = None) -> Shapes:
- """Returns all shapes in a shapefile.
- To only read shapes within a given spatial region, specify the 'bbox'
- arg as a list or tuple of xmin,ymin,xmax,ymax.
- """
- shapes = Shapes()
- shapes.extend(self.iterShapes(bbox=bbox))
- return shapes
+ # Local file path to a shapefile
+ # Load and exit early
+ self.load(path)
- def iterShapes(self, bbox: BBox | None = None) -> Iterator[Shape | None]:
- """Returns a generator of shapes in a shapefile. Useful
- for handling large shapefiles.
- To only read shapes within a given spatial region, specify the 'bbox'
- arg as a list or tuple of xmin,ymin,xmax,ymax.
- """
- shp = self.__getFileObj(self.shp)
- # Found shapefiles which report incorrect
- # shp file length in the header. Can't trust
- # that so we seek to the end of the file
- # and figure it out.
- shp.seek(0, 2)
- shpLength = shp.tell()
- shp.seek(100)
+ # Raises if not self._shp or self._dbf
+ return
- if self.numShapes:
- # Iterate exactly the number of shapes from shx header
- for i in range(self.numShapes):
- # MAYBE: check if more left of file or exit early?
- shape = self.__shape(oid=i, bbox=bbox)
- if shape:
- yield shape
- else:
- # No shx file, unknown nr of shapes
- # Instead iterate until reach end of file
- # Collect the offset indices during iteration
- i = 0
- offsets = []
- pos = shp.tell()
- while pos < shpLength:
- offsets.append(pos)
- shape = self.__shape(oid=i, bbox=bbox)
- pos = shp.tell()
- if shape:
- yield shape
- i += 1
- # Entire shp file consumed
- # Update the number of shapes and list of offsets
- assert i == len(offsets)
- self.numShapes = i
- self._offsets = offsets
+ if shp is not _NO_SHP_SENTINEL:
+ shp = cast(Union[BinaryFileT, None], shp)
+ self._shp = self._seek_0_on_file_obj_wrap_or_open_from_name("shp", shp)
+ self._shx = self._seek_0_on_file_obj_wrap_or_open_from_name("shx", shx)
- def __dbfHeader(self) -> None:
- """Reads a dbf header. Xbase-related code borrows heavily from ActiveState Python Cookbook Recipe 362715 by Raymond Hettinger"""
+ self._dbf = self._seek_0_on_file_obj_wrap_or_open_from_name("dbf", dbf)
- if not self.dbf:
+ # Load the files
+ if self._shp:
+ self._shpHeader()
+ if self._dbf:
+ self._get_dbf_reader()
+ if self._shx:
+ self._shxHeader()
+
+ @functools.cache
+ def _get_dbf_reader(self) -> DbfReader:
+ if self._dbf is None:
raise ShapefileException(
- "Shapefile Reader requires a shapefile or file-like object. (no dbf file found)"
+ "Shapefile DbfReader requires a .dbf file or file-like object."
)
- dbf = self.dbf
- # read relevant header parts
- dbf.seek(0)
- self.numRecords, self.__dbfHdrLength, self.__recordLength = unpack(
- "<xxxxLHH20x", dbf.read(32)
+ return DbfReader(
+ file_obj=self._dbf,
+ encoding=self.encoding,
+ encodingErrors=self.encodingErrors,
)
- # read fields
- numFields = (self.__dbfHdrLength - 33) // 32
- for __field in range(numFields):
- encoded_field_tuple: tuple[bytes, bytes, int, int] = unpack(
- "<11sc4xBB14x", dbf.read(32)
- )
- encoded_name, encoded_type_char, size, decimal = encoded_field_tuple
-
- if b"\x00" in encoded_name:
- idx = encoded_name.index(b"\x00")
- else:
- idx = len(encoded_name) - 1
- encoded_name = encoded_name[:idx]
- name = encoded_name.decode(self.encoding, self.encodingErrors)
- name = name.lstrip()
+ @property
+ def dbf_reader(self) -> DbfReader:
+ return self._get_dbf_reader()
- field_type = FIELD_TYPE_ALIASES[encoded_type_char]
+ @functools.cached_property
+ def shp(self) -> IO[bytes]:
+ if self._shp is None:
+ raise ShapefileException(
+ "Shapefile Reader requires a .shp shapefile or file-like object."
+ )
+ return self._shp
- self.fields.append(Field(name, field_type, size, decimal))
- terminator = dbf.read(1)
- if terminator != b"\r":
+ @functools.cached_property
+ def shx(self) -> IO[bytes]:
+ if self._shx is None:
raise ShapefileException(
- "Shapefile dbf header lacks expected terminator. (likely corrupt?)"
+ "Shapefile Reader shx use requires a .shx shapefile or file-like object."
)
+ return self._shx
- # insert deletion field at start
- self.fields.insert(0, Field("DeletionFlag", FieldType.C, 1, 0))
+ @property
+ def dbf(self) -> IO[bytes]:
+ return self.dbf_reader.dbf
- # store all field positions for easy lookups
- # note: fieldLookup gives the index position of a field inside Reader.fields
- self.__fieldLookup = {f[0]: i for i, f in enumerate(self.fields)}
+ @property
+ def numRecords(self) -> int | None:
+ if self._dbf is None:
+ return None
+ return self.dbf_reader.numRecords
- # by default, read all fields except the deletion flag, hence "[1:]"
- # note: recLookup gives the index position of a field inside a _Record list
- fieldnames = [f[0] for f in self.fields[1:]]
- __fieldTuples, recLookup, recStruct = self.__recordFields(fieldnames)
- self.__fullRecStruct = recStruct
- self.__fullRecLookup = recLookup
+ @property
+ def fields(self) -> list[Field]:
+ return self.dbf_reader.fields
- def __recordFmt(self, fields: Container[str] | None = None) -> tuple[str, int]:
- """Calculates the format and size of a .dbf record. Optional 'fields' arg
- specifies which fieldnames to unpack and which to ignore. Note that this
- always includes the DeletionFlag at index 0, regardless of the 'fields' arg.
- """
- if self.numRecords is None:
- self.__dbfHeader()
- structcodes = [f"{fieldinfo.size}s" for fieldinfo in self.fields]
- if fields is not None:
- # only unpack specified fields, ignore others using padbytes (x)
- structcodes = [
- code
- if fieldinfo.name in fields
- or fieldinfo.name == "DeletionFlag" # always unpack delflag
- else f"{fieldinfo.size}x"
- for fieldinfo, code in zip(self.fields, structcodes)
- ]
- fmt = "".join(structcodes)
- fmtSize = calcsize(fmt)
- # total size of fields should add up to recordlength from the header
- while fmtSize < self.__recordLength:
- # if not, pad byte until reaches recordlength
- fmt += "x"
- fmtSize += 1
- return (fmt, fmtSize)
+ def record(self, i: int = 0, fields: list[str] | None = None) -> _Record | None:
+ return self.dbf_reader.record(i, fields)
- def __recordFields(
- self, fields: Iterable[str] | None = None
- ) -> tuple[list[Field], dict[str, int], Struct]:
- """Returns the necessary info required to unpack a record's fields,
- restricted to a subset of fieldnames 'fields' if specified.
- Returns a list of field info tuples, a name-index lookup dict,
- and a Struct instance for unpacking these fields. Note that DeletionFlag
- is not a valid field.
- """
- if fields is not None:
- # restrict info to the specified fields
- # first ignore repeated field names (order doesn't matter)
- unique_fields = list(set(fields))
- # get the struct
- fmt, __fmtSize = self.__recordFmt(fields=unique_fields)
- recStruct = Struct(fmt)
- # make sure the given fieldnames exist
- for name in unique_fields:
- if name not in self.__fieldLookup or name == "DeletionFlag":
- raise ValueError(f'"{name}" is not a valid field name')
- # fetch relevant field info tuples
- fieldTuples = []
- for fieldinfo in self.fields[1:]:
- name = fieldinfo[0]
- if name in unique_fields:
- fieldTuples.append(fieldinfo)
- # store the field positions
- recLookup = {f[0]: i for i, f in enumerate(fieldTuples)}
- else:
- # use all the dbf fields
- fieldTuples = self.fields[1:] # sans deletion flag
- recStruct = self.__fullRecStruct
- recLookup = self.__fullRecLookup
- return fieldTuples, recLookup, recStruct
+ def records(self, fields: list[str] | None = None) -> list[_Record]:
+ return self.dbf_reader.records(fields)
- def __record(
+ def iterRecords(
self,
- fieldTuples: list[Field],
- recLookup: dict[str, int],
- recStruct: Struct,
- oid: int | None = None,
- ) -> _Record | None:
- """Reads and returns a dbf record row as a list of values. Requires specifying
- a list of field info Field namedtuples 'fieldTuples', a record name-index dict 'recLookup',
- and a Struct instance 'recStruct' for unpacking these fields.
- """
- f = self.__getFileObj(self.dbf)
-
- # The only format chars in from self.__recordFmt, in recStruct from __recordFields,
- # are s and x (ascii encoded str and pad byte) so everything in recordContents is bytes
- # https://docs.python.org/3/library/struct.html#format-characters
- recordContents = recStruct.unpack(f.read(recStruct.size))
+ fields: list[str] | None = None,
+ start: int = 0,
+ stop: int | None = None,
+ ) -> Iterator[_Record | None]:
+ return self.dbf_reader.iterRecords(fields, start, stop)
- # deletion flag field is always unpacked as first value (see __recordFmt)
- if recordContents[0] != b" ":
- # deleted record
+ def _seek_0_on_file_obj_wrap_or_open_from_name(
+ self,
+ ext: Literal["shp", "shx", "dbf"],
+ file_: BinaryFileT | None,
+ ) -> None | IO[bytes]:
+ if file_ is None:
return None
- # drop deletion flag from values
- recordContents = recordContents[1:]
-
- # check that values match fields
- if len(fieldTuples) != len(recordContents):
- raise ShapefileException(
- f"Number of record values ({len(recordContents)}) is different from the requested "
- f"number of fields ({len(fieldTuples)})"
- )
+ if isinstance(file_, (str, PathLike)):
+ baseName, __ = os.path.splitext(file_)
+ file_obj = _try_get_open_constituent_file(baseName, ext)
+ if file_obj is not None:
+ self._exit_stack.enter_context(file_obj)
+ return file_obj
- # parse each value
- record = []
- for (__name, typ, __size, decimal), value in zip(fieldTuples, recordContents):
- if typ is FieldType.N or typ is FieldType.F:
- # numeric or float: number stored as a string, right justified, and padded with blanks to the width of the field.
- value = value.split(b"\0")[0]
- value = value.replace(b"*", b"") # QGIS NULL is all '*' chars
- if value == b"":
- value = None
- elif decimal:
- try:
- value = float(value)
- except ValueError:
- # not parseable as float, set to None
- value = None
- else:
- # force to int
- try:
- # first try to force directly to int.
- # forcing a large int to float and back to int
- # will lose information and result in wrong nr.
- value = int(value)
- except ValueError:
- # forcing directly to int failed, so was probably a float.
- try:
- value = int(float(value))
- except ValueError:
- # not parseable as int, set to None
- value = None
- elif typ is FieldType.D:
- # date: 8 bytes - date stored as a string in the format YYYYMMDD.
- if (
- not value.replace(b"\x00", b"")
- .replace(b" ", b"")
- .replace(b"0", b"")
- ):
- # dbf date field has no official null value
- # but can check for all hex null-chars, all spaces, or all 0s (QGIS null)
- value = None
+ if hasattr(file_, "read"):
+ # Copy if required
+ try:
+ file_.seek(0)
+ return file_
+ except (NameError, io.UnsupportedOperation):
+ return io.BytesIO(file_.read())
+
+ raise ShapefileException(
+ f"Could not load shapefile constituent file from: {file_}"
+ )
+
+ def _load_from_url(self, url: str) -> None:
+ # Shapefile is from a url
+ # Download each file to temporary path and treat as normal shapefile path
+ urlinfo = urlparse(url)
+ shp_or_dbf_downloaded = False
+ for ext in ["shp", "shx", "dbf"]:
+ sniffed_bytes, resp = _try_to_download_binary_file(
+ urlinfo=urlinfo,
+ ext=ext,
+ suppress_http_errors=True,
+ )
+ if resp is None:
+ continue
+ if ext != "shx":
+ shp_or_dbf_downloaded = True
+ # Use tempfile as source for url data.
+ fileobj = _save_to_named_tmp_file(resp, initial_bytes=sniffed_bytes)
+ setattr(self, f"_{ext}", fileobj)
+ self._exit_stack.enter_context(fileobj)
+ if not shp_or_dbf_downloaded:
+ raise ShapefileException(f"Failed to download .shp or .dbf from: {url}")
+
+ def _load_from_zip(self, path: str) -> None:
+ # Shapefile is inside a zipfile
+ if path.count(".zip") > 1:
+ # Multiple nested zipfiles
+ raise ShapefileException(
+ f"Reading from multiple nested zipfiles is not supported: {path}"
+ )
+ # Split into zipfile and shapefile paths
+ if path.endswith(".zip"):
+ zpath = path
+ shapefile = None
+ else:
+ zpath = path[: path.find(".zip") + 4]
+ shapefile = path[path.find(".zip") + 4 + 1 :]
+
+ zipfileobj: tempfile._TemporaryFileWrapper[bytes] | io.BufferedReader
+ # Create a zip file handle
+ urlinfo = urlparse(zpath)
+
+ resp: ReadableBinStream | None
+ if urlinfo.scheme in SUPPORTED_URL_SCHEMES:
+ # Zipfile is from a url
+ # Download to a temporary file and treat as normal zipfile
+ sniffed_bytes, resp = _try_to_download_binary_file(urlinfo=urlinfo)
+
+ # Use named tmp file as source for zip file data.
+ zipfileobj = _save_to_named_tmp_file(
+ resp,
+ initial_bytes=sniffed_bytes,
+ suffix=".zip",
+ )
+
+ else:
+ # Zipfile is from a file
+ zipfileobj = open(zpath, mode="rb")
+
+ # Open the zipfile archive
+ with zipfile.ZipFile(zipfileobj, "r") as archive:
+ if not shapefile:
+ # Only the zipfile path is given
+ # Inspect zipfile contents to find the full shapefile path
+ shapefiles = [
+ name
+ for name in archive.namelist()
+ if (name.endswith(".SHP") or name.endswith(".shp"))
+ ]
+ # The zipfile must contain exactly one shapefile
+ if len(shapefiles) == 0:
+ raise ShapefileException("Zipfile does not contain any shapefiles")
+ if len(shapefiles) == 1:
+ shapefile = shapefiles[0]
else:
+ raise ShapefileException(
+ f"Zipfile contains more than one shapefile: {shapefiles}. "
+ "Please specify the full path to the shapefile you would like to open."
+ )
+ # Try to extract file-like objects from zipfile
+ shapefile = os.path.splitext(shapefile)[0] # root shapefile name
+ for ext in ["shp", "shx", "dbf"]:
+ for cased_ext in {ext.lower(), ext.upper(), ext}:
try:
- # return as python date object
- y, m, d = int(value[:4]), int(value[4:6]), int(value[6:8])
- value = date(y, m, d)
- except (TypeError, ValueError):
- # if invalid date, just return as unicode string so user can decimalde
- value = str(value.strip())
- elif typ is FieldType.L:
- # logical: 1 byte - initialized to 0x20 (space) otherwise T or F.
- if value == b" ":
- value = None # space means missing or not yet set
- else:
- if value in b"YyTt1":
- value = True
- elif value in b"NnFf0":
- value = False
- else:
- value = None # unknown value is set to missing
- else:
- value = value.decode(self.encoding, self.encodingErrors)
- value = value.strip().rstrip(
- "\x00"
- ) # remove null-padding at end of strings
- record.append(value)
+ member = archive.open(f"{shapefile}.{cased_ext}")
+ # Use read+write tempfile as source for member data.
+ fileobj = _save_to_named_tmp_file(member)
+ setattr(self, f"_{ext.lower()}", fileobj)
+ self._exit_stack.enter_context(fileobj)
+ except (OSError, AttributeError, KeyError):
+ pass
+ # Close and delete the temporary zipfile
+ try:
+ zipfileobj.close()
+ # TODO Does catching all possible exceptions really increase
+ # the chances of closing the zipfile successully, or does it
+ # just mean .close() failures will still fail, but fail
+ # silently?
+ except: # noqa: E722
+ pass
- return _Record(recLookup, record, oid)
+ def load(self, shapefile: str | None = None) -> None:
+ """Opens a shapefile from a filename or file-like
+ object. Normally this method would be called by the
+ constructor with the file name as an argument."""
+ if shapefile:
+ (shapeName, __ext) = os.path.splitext(shapefile)
+ self.shapeName = shapeName
+ self.load_shp(shapeName)
+ self.load_shx(shapeName)
+ self.load_dbf(shapeName)
+ if not (self._shp or self._dbf):
+ raise ShapefileException(
+ f"Unable to open {shapeName}.dbf or {shapeName}.shp."
+ )
- def record(self, i: int = 0, fields: list[str] | None = None) -> _Record | None:
- """Returns a specific dbf record based on the supplied index.
- To only read some of the fields, specify the 'fields' arg as a
- list of one or more fieldnames.
+ def load_shp(self, shapefile_name: str) -> None:
"""
- f = self.__getFileObj(self.dbf)
- if self.numRecords is None:
- self.__dbfHeader()
- i = self.__restrictIndex(i)
- recSize = self.__recordLength
- f.seek(0)
- f.seek(self.__dbfHdrLength + (i * recSize))
- fieldTuples, recLookup, recStruct = self.__recordFields(fields)
- return self.__record(
- oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
- )
+ Attempts to load file with .shp extension as both lower and upper case
+ """
+ self._shp = _try_get_open_constituent_file(shapefile_name, "shp")
+ if self._shp:
+ self._exit_stack.enter_context(self._shp)
+ self._shpHeader()
- def records(self, fields: list[str] | None = None) -> list[_Record]:
- """Returns all records in a dbf file.
- To only read some of the fields, specify the 'fields' arg as a
- list of one or more fieldnames.
+ def load_shx(self, shapefile_name: str) -> None:
"""
- if self.numRecords is None:
- self.__dbfHeader()
- records = []
- f = self.__getFileObj(self.dbf)
- f.seek(self.__dbfHdrLength)
- fieldTuples, recLookup, recStruct = self.__recordFields(fields)
- # self.__dbfHeader() sets self.numRecords, so it's fine to cast it to int
- # (to tell mypy it's not None).
- for i in range(cast(int, self.numRecords)):
- r = self.__record(
- oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
- )
- if r:
- records.append(r)
- return records
+ Attempts to load file with .shx extension as both lower and upper case
+ """
+ self._shx = _try_get_open_constituent_file(shapefile_name, "shx")
+ if self._shx:
+ self._exit_stack.enter_context(self._shx)
+ self._shxHeader()
- def iterRecords(
- self,
- fields: list[str] | None = None,
- start: int = 0,
- stop: int | None = None,
- ) -> Iterator[_Record | None]:
- """Returns a generator of records in a dbf file.
- Useful for large shapefiles or dbf files.
- To only read some of the fields, specify the 'fields' arg as a
- list of one or more fieldnames.
- By default yields all records. Otherwise, specify start
- (default: 0) or stop (default: number_of_records)
- to only yield record numbers i, where
- start <= i < stop, (or
- start <= i < number_of_records + stop
- if stop < 0).
+ def load_dbf(self, shapefile_name: str) -> None:
"""
- if self.numRecords is None:
- self.__dbfHeader()
- if not isinstance(self.numRecords, int):
+ Attempts to load file with .dbf extension as both lower and upper case
+ """
+ self._dbf = _try_get_open_constituent_file(shapefile_name, "dbf")
+ if self._dbf:
+ self._exit_stack.enter_context(self._dbf)
+ self._get_dbf_reader()
+
+ def __len__(self) -> int:
+ """Returns the number of shapes/records in the shapefile."""
+ if self._dbf:
+ # Preferably use dbf record count
+ return len(self.dbf_reader)
+
+ if self._shp:
+ # Otherwise use shape count
+ if self._shx:
+ if self.numShapes is None:
+ self._shxHeader()
+
+ # ._shxHeader sets self.numShapes or raises Exception
+ return cast(int, self.numShapes)
+
+ # Index file not available, iterate all shapes to get total count
+ if self.numShapes is None:
+ # Determine length of shp file
+ shp = self.shp
+ checkpoint = shp.tell()
+ shp.seek(0, 2)
+ shpLength = shp.tell()
+ shp.seek(100)
+ # Do a fast shape iteration until end of file.
+ offsets = []
+ pos = shp.tell()
+ while pos < shpLength:
+ offsets.append(pos)
+ # Unpack the shape header only
+ (__recNum, recLength) = unpack_2_int32_be(shp.read(8))
+ # Jump to next shape position
+ pos += 8 + (2 * recLength)
+ shp.seek(pos)
+ # Set numShapes and offset indices
+ self.numShapes = len(offsets)
+ self._offsets = offsets
+ # Return to previous file position
+ shp.seek(checkpoint)
+
+ return self.numShapes
+
+ # No file loaded yet, treat as 'empty' shapefile
+ return 0
+
+ def __iter__(self) -> Iterator[ShapeRecord]:
+ """Iterates through the shapes/records in the shapefile."""
+ yield from self.iterShapeRecords()
+
+ @property
+ def __geo_interface__(self) -> GeoJSONFeatureCollectionWithBBox:
+ shaperecords = self.shapeRecords()
+ fcollection = GeoJSONFeatureCollectionWithBBox(
+ bbox=list(self.bbox),
+ **shaperecords.__geo_interface__,
+ )
+ return fcollection
+
+ @property
+ def shapeTypeName(self) -> str:
+ return SHAPETYPE_LOOKUP[self.shapeType]
+
+ def _shpHeader(self) -> None:
+ """Reads the header information from a .shp file."""
+ if not self.shp:
raise ShapefileException(
- "Error when reading number of Records in dbf file header"
+ "Shapefile Reader requires a shapefile or file-like object (no shp file found)."
)
- f = self.__getFileObj(self.dbf)
- start = self.__restrictIndex(start)
- if stop is None:
- stop = self.numRecords
- elif abs(stop) > self.numRecords:
- raise IndexError(
- f"abs(stop): {abs(stop)} exceeds number of records: {self.numRecords}."
+
+ shp = self.shp
+ # File length (16-bit word * 2 = bytes)
+ shp.seek(24)
+ self.shpLength = unpack(">i", shp.read(4))[0] * 2
+ # Shape type
+ shp.seek(32)
+ self.shapeType = unpack("<i", shp.read(4))[0]
+ # The shapefile's bounding box (lower left, upper right)
+ # self.bbox: BBox = tuple(_Array("d", unpack("<4d", shp.read(32))))
+ self.bbox: BBox = unpack("<4d", shp.read(32))
+ # xmin, ymin, xmax, ymax = unpack("<4d", shp.read(32))
+ # self.bbox = BBox(xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax)
+ # Elevation
+ # self.zbox: ZBox = tuple(_Array("d", unpack("<2d", shp.read(16))))
+ self.zbox: ZBox = unpack("<2d", shp.read(16))
+ # zmin, zmax = unpack("<2d", shp.read(16))
+ # self.zbox = ZBox(zmin=zmin, zmax=zmax)
+ # Measure
+ # Measure values less than -10e38 are nodata values according to the spec
+ m_bounds = [
+ float(m_bound) if m_bound >= NODATA else None
+ for m_bound in unpack("<2d", shp.read(16))
+ ]
+ # self.mbox = MBox(mmin=m_bounds[0], mmax=m_bounds[1])
+ self.mbox: tuple[float | None, float | None] = (m_bounds[0], m_bounds[1])
+
+ def _shxHeader(self) -> None:
+ """Reads the header information from a .shx file."""
+ shx = self.shx
+ if not shx:
+ raise ShapefileException(
+ "Shapefile Reader requires a shapefile or file-like object (no shx file found)."
)
- elif stop < 0:
- stop = range(self.numRecords)[stop]
- recSize = self.__recordLength
- f.seek(self.__dbfHdrLength + (start * recSize))
- fieldTuples, recLookup, recStruct = self.__recordFields(fields)
- for i in range(start, stop):
- r = self.__record(
- oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct
+ # File length (16-bit word * 2 = bytes) - header length
+ shx.seek(24)
+ shx_records_length_B = (unpack(">i", shx.read(4))[0] * 2) - 100
+ self.numShapes = shx_records_length_B // 8
+
+ def _shxOffsets(self) -> None:
+ """Reads the shape offset positions from a .shx file"""
+ shx = self.shx
+ if not shx:
+ raise ShapefileException(
+ "Shapefile Reader requires a shapefile or file-like object (no shx file found)."
)
- if r:
- yield r
+ if self.numShapes is None:
+ raise ShapefileException(
+ "numShapes must not be None. "
+ " Was there a problem with ._shxHeader() ?"
+ f"Got: {self.numShapes=}"
+ )
+ # Jump to the first record.
+ shx.seek(100)
+ # Each index record consists of two nrs, we only want the first one
+ shxRecords = _Array[int]("i", shx.read(2 * self.numShapes * 4))
+ if sys.byteorder != "big":
+ shxRecords.byteswap()
+ self._offsets = [2 * el for el in shxRecords[::2]]
+
+ def _shape_index(self, i: int | None = None) -> int | None:
+ """Returns the offset in a .shp file for a shape based on information
+ in the .shx index file."""
+ # Return None if no shx or no index requested
+ if not self._shx or i is None:
+ return None
+ # At this point, we know the shx file exists
+ if not self._offsets:
+ self._shxOffsets()
+ return self._offsets[i]
+
+ def __del__(self) -> None:
+ self.close()
+
+ def close(self) -> None:
+ self._exit_stack.close()
+ # Close any files that the reader opened (but not those given by user)
+ # for file_ in [self._shp, self._dbf, self._shx]:
+ # if file_ is None:
+ # continue
+
+ # if hasattr(file_, "close"):
+ # try:
+ # file_.close()
+ # except OSError:
+ # pass
+
+ def __str__(self) -> str:
+ """
+ Use some general info on the shapefile as __str__
+ """
+ info = ["shapefile Reader"]
+ if self.shp:
+ info.append(
+ f" {len(self)} shapes (type '{SHAPETYPE_LOOKUP[self.shapeType]}')"
+ )
+ if self.dbf:
+ info.append(f" {len(self)} records ({len(self.fields)} fields)")
+ return "\n".join(info)
+
+ def __enter__(self) -> Reader:
+ self._exit_stack.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: BaseException | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ self.close()
+ return None
+
+ def _shape(self, oid: int | None = None, bbox: BBox | None = None) -> Shape | None:
+ """Returns the header info and geometry for a single shape."""
+
+ # shape = Shape(oid=oid)
+ (__recNum, recLength) = unpack_2_int32_be(self.shp.read(8))
+ # Determine the start of the next record
+
+ # Convert from num of 16 bit words, to 8 bit bytes
+ recLength_bytes = 2 * recLength
+
+ # next_shape = self.shp.tell() + recLength_bytes
+
+ # Read entire record into memory to avoid having to call
+ # seek on the file afterwards
+ b_io: ReadSeekableBinStream = io.BytesIO(self.shp.read(recLength_bytes))
+ b_io.seek(0)
+
+ shapeType = unpack("<i", b_io.read(4))[0]
+
+ ShapeClass = SHAPE_CLASS_FROM_SHAPETYPE[shapeType]
+ shape = ShapeClass.from_byte_stream(
+ shapeType, b_io, recLength_bytes, oid=oid, bbox=bbox
+ )
+
+ # Seek to the end of this record as defined by the record header because
+ # the shapefile spec doesn't require the actual content to meet the header
+ # definition. Probably allowed for lazy feature deletion.
+ # f.seek(next_shape)
+
+ return shape
+
+ def shape(self, i: int = 0, bbox: BBox | None = None) -> Shape | None:
+ """Returns a shape object for a shape in the geometry
+ record file.
+ If the 'bbox' arg is given (list or tuple of xmin,ymin,xmax,ymax),
+ returns None if the shape is not within that region.
+ """
+ N = len(self)
+ if N == 0:
+ raise ShapefileException("No shapes loaded.")
+ i = ensure_within_bounds(i, N)
+ offset = self._shape_index(i)
+ if not offset:
+ # Shx index not available.
+ # Determine length of shp file
+ self.shp.seek(0, 2)
+ shp_length_B = self.shp.tell()
+ self.shp.seek(100)
+ # Do a fast shape iteration until the requested index or end of file.
+ _i = 0
+ offset = self.shp.tell()
+ while offset < shp_length_B:
+ if _i == i:
+ # Reached the requested index, exit loop with the offset value
+ break
+ # Unpack the shape header only
+ (__recNum, recLength) = unpack_2_int32_be(self.shp.read(8))
+ # Jump to next shape position
+ offset += 8 + (2 * recLength)
+ self.shp.seek(offset)
+ _i += 1
+ # If the index was not found, it likely means the .shp file is incomplete
+ if _i != i:
+ raise ShapefileException(
+ f"Shape index {i} is out of bounds; the .shp file only contains {_i} shapes"
+ )
+
+ # Seek to the offset and read the shape
+ self.shp.seek(offset)
+ return self._shape(oid=i, bbox=bbox)
+
+ def shapes(self, bbox: BBox | None = None) -> Shapes:
+ """Returns all shapes in a shapefile.
+ To only read shapes within a given spatial region, specify the 'bbox'
+ arg as a list or tuple of xmin,ymin,xmax,ymax.
+ """
+ shapes = Shapes()
+ shapes.extend(self.iterShapes(bbox=bbox))
+ return shapes
+
+ def iterShapes(self, bbox: BBox | None = None) -> Iterator[Shape | None]:
+ """Returns a generator of shapes in a shapefile. Useful
+ for handling large shapefiles.
+ To only read shapes within a given spatial region, specify the 'bbox'
+ arg as a list or tuple of xmin,ymin,xmax,ymax.
+ """
+ # Found shapefiles which report incorrect
+ # shp file length in the header. Can't trust
+ # that so we seek to the end of the file
+ # and figure it out.
+ self.shp.seek(0, 2)
+ shp_length_B = self.shp.tell()
+ self.shp.seek(100)
+
+ if self.numShapes:
+ # Iterate exactly the number of shapes from shx header
+ for i in range(self.numShapes):
+ # MAYBE: check if more left of file or exit early?
+ shape = self._shape(oid=i, bbox=bbox)
+ if shape:
+ yield shape
+ else:
+ # No shx file, unknown nr of shapes
+ # Instead iterate until reach end of file
+ # Collect the offset indices during iteration
+ i = 0
+ offsets = []
+ pos = self.shp.tell()
+ while pos < shp_length_B:
+ offsets.append(pos)
+ shape = self._shape(oid=i, bbox=bbox)
+ pos = self.shp.tell()
+ if shape:
+ yield shape
+ i += 1
+ # Entire shp file consumed
+ # Update the number of shapes and list of offsets
+ assert i == len(offsets)
+ self.numShapes = i
+ self._offsets = offsets
def shapeRecord(
self,
@@ -3168,10 +3314,14 @@ class Reader:
If the 'bbox' arg is given (list or tuple of xmin,ymin,xmax,ymax),
returns None if the shape is not within that region.
"""
- i = self.__restrictIndex(i)
+ if self.numRecords is None:
+ raise ShapefileException(
+ "A .dbf file is required to read Records, for ShapeRecords"
+ )
+ i = ensure_within_bounds(i, self.numRecords)
shape = self.shape(i, bbox=bbox)
if shape:
- record = self.record(i, fields=fields)
+ record = self.dbf_reader.record(i, fields=fields)
return ShapeRecord(shape=shape, record=record)
return None
@@ -3204,20 +3354,333 @@ class Reader:
if bbox is None:
# iterate through all shapes and records
for shape, record in zip(
- self.iterShapes(), self.iterRecords(fields=fields)
+ self.iterShapes(), self.dbf_reader.iterRecords(fields=fields)
):
yield ShapeRecord(shape=shape, record=record)
else:
# only iterate where shape.bbox overlaps with the given bbox
- # TODO: internal __record method should be faster but would have to
+ # TODO: internal _record method should be faster but would have to
# make sure to seek to correct file location...
- # fieldTuples,recLookup,recStruct = self.__recordFields(fields)
- for shape in self.iterShapes(bbox=bbox):
- if shape:
- # record = self.__record(oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct)
- record = self.record(i=shape.oid, fields=fields)
- yield ShapeRecord(shape=shape, record=record)
+ # fieldTuples,recLookup,recStruct = self._record_fields(fields)
+ for shape in self.iterShapes(bbox=bbox):
+ if shape:
+ # record = self._record(oid=i, fieldTuples=fieldTuples, recLookup=recLookup, recStruct=recStruct)
+ record = self.dbf_reader.record(i=shape.oid, fields=fields)
+ yield ShapeRecord(shape=shape, record=record)
+
+
+def _ensure_file_obj(
+ f: str | WriteSeekableBinStream | None,
+ exit_stack: ExitStack,
+ file_mode: str = "wb+",
+ ExceptionClass: type[ShapefileException] = ShapefileException,
+) -> WriteSeekableBinStream:
+ """Safety handler to verify file-like objects"""
+ if not f:
+ raise ExceptionClass("No file-like object available.")
+ if isinstance(f, str):
+ pth = os.path.split(f)[0]
+ if pth and not os.path.exists(pth):
+ os.makedirs(pth)
+ fp = open(f, file_mode)
+ exit_stack.enter_context(fp)
+ return fp
+
+ if hasattr(f, "write"):
+ return f
+ raise ExceptionClass(f"Unsupported file-like object: {f}")
+
+
+def _is_file_obj_open(f: WriteSeekableBinStream | str | None) -> bool:
+ if not f:
+ return False
+ return not getattr(f, "closed", False)
+
+
+def _try_to_flush_file_obj(f: WriteSeekableBinStream | str | None) -> None:
+ if not f or not hasattr(f, "flush"):
+ return
+ if getattr(f, "closed", False):
+ return
+ try:
+ f.flush()
+ except OSError:
+ pass
+
+
+class DbfWriter(AbstractContextManager["DbfWriter", None]):
+ """Writes .dbf files (dBASE database files), in particular those of Shapefiles."""
+
+ def __init__(
+ self,
+ path: str | PathLike[Any] | None = None,
+ dbf: str | WriteSeekableBinStream | None = None,
+ *,
+ encoding: str = "utf-8",
+ encodingErrors: str = "strict",
+ max_num_fields: int = 2046,
+ # Keep kwargs even though unused, to preserve PyShp 2.4 API
+ **kwargs: Any,
+ ):
+ self.path = fsdecode_if_pathlike(path)
+ self._dbf: str | WriteSeekableBinStream
+ self.fields: list[Field] = []
+ self.max_num_fields = max_num_fields
+ # Encoding
+ self.encoding = encoding
+ self.encodingErrors = encodingErrors
+ if self.path:
+ if not isinstance(self.path, str):
+ raise TypeError(
+ f"Path {self.path!r} must be of type str or path-like, not {type(self.path)}."
+ )
+ self._dbf = os.path.splitext(self.path)[0] + ".dbf"
+ elif dbf:
+ self._dbf = dbf
+ else:
+ raise TypeError(
+ "Either the target filepath, or dbf must be set to create a .dbf file."
+ )
+
+ self.recNum = 0
+ self.deletionFlag = 0
+
+ # Support not closing opened file objects passed in e.g.(handled by some
+ # external context manager, or the caller manually calling .close).
+ #
+ # This will only ever hold at most one context manager.
+ # But an ExitStack is the right tool for the job
+ # when the number of context manager(s) depends on user input.
+ self._exit_stack = ExitStack()
+
+ @functools.cached_property
+ def dbf(self) -> WriteSeekableBinStream:
+ return _ensure_file_obj(
+ self._dbf,
+ exit_stack=self._exit_stack,
+ ExceptionClass=dbfFileException,
+ )
+
+ def __enter__(self) -> DbfWriter:
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ self.close()
+ return None
+
+ def close(self) -> None:
+ """
+ Write final dbf header, close opened files.
+ """
+
+ # Update the dbf header with final length etc
+ if _is_file_obj_open(self.dbf):
+ self._dbfHeader()
+
+ _try_to_flush_file_obj(self.dbf)
+
+ self._exit_stack.close()
+
+ def field(
+ # Types of args should match *Field
+ self,
+ name: str,
+ field_type: FieldTypeT = "C",
+ size: int = 50,
+ decimal: int = 0,
+ ) -> None:
+ """Adds a dbf field descriptor to the shapefile."""
+ if len(self.fields) >= self.max_num_fields:
+ raise dbfFileException(
+ f".dbf Shapefile Writer reached maximum number of fields: {self.max_num_fields}."
+ )
+ field_ = Field.from_unchecked(name, field_type, size, decimal)
+ self.fields.append(field_)
+
+ def _dbfHeader(self) -> None:
+ """Writes the dbf header and field descriptors."""
+ f = self.dbf
+ f.seek(0)
+ version = 3
+ year, month, day = time.localtime()[:3]
+ year -= 1900
+ # Get all fields, ignoring DeletionFlag if specified
+ fields = [field for field in self.fields if field[0] != "DeletionFlag"]
+ # Ensure has at least one field
+ if not fields:
+ raise ShapefileException(
+ "Shapefile dbf file must contain at least one field."
+ )
+ numRecs = self.recNum
+ numFields = len(fields)
+ headerLength = numFields * 32 + 33
+ if headerLength >= 65535:
+ raise ShapefileException(
+ "Shapefile dbf header length exceeds maximum length."
+ )
+ recordLength = sum(field.size for field in fields) + 1
+ header = pack(
+ "<BBBBLHH20x",
+ version,
+ year,
+ month,
+ day,
+ numRecs,
+ headerLength,
+ recordLength,
+ )
+ f.write(header)
+ # Field descriptors
+ for field in fields:
+ encoded_name = field.name.encode(self.encoding, self.encodingErrors)
+ encoded_name = encoded_name.replace(b" ", b"_")
+ encoded_name = encoded_name[:10].ljust(11).replace(b" ", b"\x00")
+ encodedFieldType = field.field_type.encode("ascii")
+ fld = pack(
+ "<11sc4xBB14x",
+ encoded_name,
+ encodedFieldType,
+ field.size,
+ field.decimal,
+ )
+ f.write(fld)
+ # Terminator
+ f.write(b"\r")
+
+ def record(
+ self,
+ *recordList: RecordValue,
+ **recordDict: RecordValue,
+ ) -> None:
+ """Creates a dbf attribute record. You can submit either a sequence of
+ field values or keyword arguments of field names and values. Before
+ adding records you must add fields for the record values using the
+ field() method. If the record values exceed the number of fields the
+ extra ones won't be added. In the case of using keyword arguments to specify
+ field/value pairs only fields matching the already registered fields
+ will be added."""
+ record: list[RecordValue]
+ fieldCount = sum(1 for field in self.fields if field[0] != "DeletionFlag")
+ if recordList:
+ record = list(recordList)
+ while len(record) < fieldCount:
+ record.append("")
+ elif recordDict:
+ record = []
+ for field in self.fields:
+ if field[0] == "DeletionFlag":
+ continue # ignore deletionflag field in case it was specified
+ if field[0] in recordDict:
+ val = recordDict[field[0]]
+ if val is None:
+ record.append("")
+ else:
+ record.append(val)
+ else:
+ record.append("") # need empty value for missing dict entries
+ else:
+ # Blank fields for empty record
+ record = ["" for _ in range(fieldCount)]
+ self.__dbfRecord(record)
+
+ def __dbfRecord(self, record: list[RecordValue]) -> None:
+ """Writes the dbf records."""
+ f = self.dbf
+ if self.recNum == 0:
+ # first records, so all fields should be set
+ # allowing us to write the dbf header
+ # cannot change the fields after this point
+ self._dbfHeader()
+ # first byte of the record is deletion flag, always disabled
+ f.write(b" ")
+ # begin
+ self.recNum += 1
+ fields = (
+ field for field in self.fields if field[0] != "DeletionFlag"
+ ) # ignore deletionflag field in case it was specified
+ for (fieldName, fieldType, size, deci), value in zip(fields, record):
+ # write
+ # fieldName, fieldType, size and deci were already checked
+ # when their Field instance was created and added to self.fields
+ str_val: str | None = None
+
+ if fieldType in ("N", "F"):
+ # numeric or float: number stored as a string, right justified, and padded with blanks to the width of the field.
+ if value in MISSING:
+ str_val = "*" * size # QGIS NULL
+ elif not deci:
+ # force to int
+ try:
+ # first try to force directly to int.
+ # forcing a large int to float and back to int
+ # will lose information and result in wrong nr.
+ num_val = int(cast(int, value))
+ except ValueError:
+ # forcing directly to int failed, so was probably a float.
+ num_val = int(float(cast(float, value)))
+ str_val = format(num_val, "d")[:size].rjust(
+ size
+ ) # caps the size if exceeds the field size
+ else:
+ f_val = float(cast(float, value))
+ str_val = format(f_val, f".{deci}f")[:size].rjust(
+ size
+ ) # caps the size if exceeds the field size
+ elif fieldType == "D":
+ # date: 8 bytes - date stored as a string in the format YYYYMMDD.
+ if isinstance(value, date):
+ str_val = f"{value.year:04d}{value.month:02d}{value.day:02d}"
+ elif isinstance(value, list) and len(value) == 3:
+ str_val = f"{value[0]:04d}{value[1]:02d}{value[2]:02d}"
+ elif value in MISSING:
+ str_val = "0" * 8 # QGIS NULL for date type
+ elif isinstance(value, str) and len(value) == 8:
+ pass # value is already a date string
+ else:
+ raise ShapefileException(
+ "Date values must be either a datetime.date object, a list, a YYYYMMDD string, or a missing value."
+ )
+ elif fieldType == "L":
+ # logical: 1 byte - initialized to 0x20 (space) otherwise T or F.
+ if value in MISSING:
+ str_val = " " # missing is set to space
+ elif value in [True, 1]:
+ str_val = "T"
+ elif value in [False, 0]:
+ str_val = "F"
+ else:
+ str_val = " " # unknown is set to space
+
+ if str_val is None:
+ # Types C and M, and anything else, value is forced to string,
+ # encoded by the codec specified to the Writer (utf-8 by default),
+ # then the resulting bytes are padded and truncated to the length
+ # of the field
+ encoded = (
+ str(value)
+ .encode(self.encoding, self.encodingErrors)[:size]
+ .ljust(size)
+ )
+ else:
+ # str_val was given a not-None string value
+ # under the checks for fieldTypes "N", "F", "D", or "L" above
+ # Numeric, logical, and date numeric types are ascii already, but
+ # for Shapefile or dbf spec reasons
+ # "should be default ascii encoding"
+ encoded = str_val.encode("ascii", self.encodingErrors)
+
+ if len(encoded) != size:
+ raise ShapefileException(
+ f"Shapefile Writer unable to pack incorrect sized {value=}"
+ f" (encoded as {len(encoded)}B) into field '{fieldName}' ({size}B)."
+ )
+ f.write(encoded)
class Writer:
@@ -3239,61 +3702,105 @@ class Writer:
# Keep kwargs even though unused, to preserve PyShp 2.4 API
**kwargs: Any,
):
+ target = fsdecode_if_pathlike(target)
self.target = target
+
+ # User settable - see ### Geometry and Record Balancing in README.md
self.autoBalance = autoBalance
- self.fields: list[Field] = []
+
+ self.encoding = encoding
+ self.encodingErrors = encodingErrors
+
+ # User settable - see #### Setting the Shape Type in README.md
self.shapeType = shapeType
- self.shp: WriteSeekableBinStream | None = None
- self.shx: WriteSeekableBinStream | None = None
- self.dbf: WriteSeekableBinStream | None = None
- self._files_to_close: list[BinaryFileStreamT] = []
+
+ self._shp: str | WriteSeekableBinStream | None = shp
+ self._shx: str | WriteSeekableBinStream | None = shx
+ self._dbf: str | WriteSeekableBinStream | None = dbf
+ self._dbf_writer: DbfWriter | None = None
+ self._exit_stack = ExitStack()
if target:
- target = fsdecode_if_pathlike(target)
if not isinstance(target, str):
raise TypeError(
f"The target filepath {target!r} must be of type str/unicode or path-like, not {type(target)}."
)
- self.shp = self.__getFileObj(os.path.splitext(target)[0] + ".shp")
- self.shx = self.__getFileObj(os.path.splitext(target)[0] + ".shx")
- self.dbf = self.__getFileObj(os.path.splitext(target)[0] + ".dbf")
- elif shp or shx or dbf:
- if shp:
- self.shp = self.__getFileObj(shp)
- if shx:
- self.shx = self.__getFileObj(shx)
- if dbf:
- self.dbf = self.__getFileObj(dbf)
- else:
+ self._shp = os.path.splitext(target)[0] + ".shp"
+ self._shx = os.path.splitext(target)[0] + ".shx"
+ self._dbf = os.path.splitext(target)[0] + ".dbf"
+ elif not (shp or shx or dbf):
raise TypeError(
"Either the target filepath, or any of shp, shx, or dbf must be set to create a shapefile."
)
+ if self._dbf:
+ self._dbf_writer = DbfWriter(
+ target=target,
+ dbf=self._dbf,
+ encoding=encoding,
+ encodingErrors=encodingErrors,
+ )
# Initiate with empty headers, to be finalized upon closing
- if self.shp:
+ if self._shp:
self.shp.write(b"9" * 100)
- if self.shx:
+ if self._shx:
self.shx.write(b"9" * 100)
# Geometry record offsets and lengths for writing shx file.
- self.recNum = 0
self.shpNum = 0
self._bbox: BBox | None = None
self._zbox: ZBox | None = None
self._mbox: MBox | None = None
# Use deletion flags in dbf? Default is false (0). Note: Currently has no effect, records should NOT contain deletion flags.
self.deletionFlag = 0
- # Encoding
- self.encoding = encoding
- self.encodingErrors = encodingErrors
+
+ @functools.cached_property
+ def shp(self) -> WriteSeekableBinStream:
+ return _ensure_file_obj(
+ self._shp,
+ exit_stack=self._exit_stack,
+ )
+
+ @functools.cached_property
+ def shx(self) -> WriteSeekableBinStream:
+ return _ensure_file_obj(
+ self._shx,
+ exit_stack=self._exit_stack,
+ )
+
+ @functools.cached_property
+ def dbf_writer(self) -> DbfWriter:
+ if self._dbf_writer is None:
+ raise dbfFileException(
+ f"No dbf file. Got target: {self.target} & dbf: {self._dbf}"
+ )
+ self._exit_stack.enter_context(self._dbf_writer)
+ return self._dbf_writer
+
+ @property
+ def dbf(self) -> WriteSeekableBinStream:
+ return self.dbf_writer.dbf
+
+ @property
+ def fields(self) -> list[Field]:
+ return self.dbf_writer.fields
+
+ @fields.setter
+ def fields(self, value: list[Field]) -> None:
+ self.dbf_writer.fields = value
+
+ @property
+ def recNum(self) -> int:
+ if not self._dbf_writer:
+ return 0
+ return self.dbf_writer.recNum
def __len__(self) -> int:
"""Returns the current number of features written to the shapefile.
If shapes and records are unbalanced, the length is considered the highest
of the two."""
- return max(self.recNum, self.shpNum)
+ if not self._dbf_writer:
+ return self.shpNum
+ return max(self.dbf_writer.recNum, self.shpNum)
def __enter__(self) -> Writer:
- """
- Enter phase of context manager.
- """
return self
def __exit__(
@@ -3302,9 +3809,6 @@ class Writer:
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
- """
- Exit phase of context manager, finish writing and close the files.
- """
self.close()
return None
@@ -3315,89 +3819,63 @@ class Writer:
"""
Write final shp, shx, and dbf headers, close opened files.
"""
- # Check if any of the files have already been closed
- shp_open = self.shp and not (hasattr(self.shp, "closed") and self.shp.closed)
- shx_open = self.shx and not (hasattr(self.shx, "closed") and self.shx.closed)
- dbf_open = self.dbf and not (hasattr(self.dbf, "closed") and self.dbf.closed)
+
+ # Check if user supplied shp or dbf file objects have
+ # already been closed by the user for some reason.
+ #
+ # TODO: Do we really need to support this? A user who supplies
+ # custom file objects for shp and dbf, opens a Writer
+ # to partially write the Shapefile, manually closes the .shp object
+ # but uses the context manager or calls .close expecting
+ # Writer to create the dbf's header? Really?
+ shp_open = self._shp and _is_file_obj_open(self.shp)
+ dbf_open = (
+ False
+ if self._dbf_writer is None
+ else _is_file_obj_open(self.dbf_writer.dbf)
+ )
# Balance if already not balanced
- if self.shp and shp_open and self.dbf and dbf_open:
+ if shp_open and dbf_open:
if self.autoBalance:
self.balance()
- if self.recNum != self.shpNum:
+ if self.dbf_writer.recNum != self.shpNum:
raise ShapefileException(
"When saving both the dbf and shp file, "
- f"the number of records ({self.recNum}) must correspond "
+ f"the number of records ({self.dbf_writer.recNum}) must correspond "
f"with the number of shapes ({self.shpNum})"
)
- # Fill in the blank headers
- if self.shp and shp_open:
- self.__shapefileHeader(self.shp, headerType="shp")
- if self.shx and shx_open:
- self.__shapefileHeader(self.shx, headerType="shx")
- # Update the dbf header with final length etc
- if self.dbf and dbf_open:
- self.__dbfHeader()
+ # Fill in the blank headers and flush files
+ if shp_open:
+ self._shp_or_shx_header(self.shp, headerType="shp")
+ _try_to_flush_file_obj(self.shp)
- # Flush files
- for attribute in (self.shp, self.shx, self.dbf):
- if attribute is None:
- continue
- if hasattr(attribute, "flush") and not getattr(attribute, "closed", False):
- try:
- attribute.flush()
- except OSError:
- pass
+ if self._shx and _is_file_obj_open(self.shx):
+ self._shp_or_shx_header(self.shx, headerType="shx")
+ _try_to_flush_file_obj(self.shx)
- # Close any files that the writer opened (but not those given by user)
- for attribute in self._files_to_close:
- if hasattr(attribute, "close"):
- try:
- attribute.close()
- except OSError:
- pass
- self._files_to_close = []
+ # Ensure any files that the writer opened are closed.
+ # (contains self.dbf_writer, which is triggered to
+ # writes its header here, but should not contain
+ # user-supplied, already opened file objects that
+ # might be closed by an outer context manager).
+ # Idempotent.
+ self._exit_stack.close()
- @overload
- def __getFileObj(self, f: str) -> WriteSeekableBinStream: ...
- @overload
- def __getFileObj(self, f: None) -> NoReturn: ...
- @overload
- def __getFileObj(self, f: WriteSeekableBinStream) -> WriteSeekableBinStream: ...
- def __getFileObj(
- self, f: str | None | WriteSeekableBinStream
- ) -> WriteSeekableBinStream:
- """Safety handler to verify file-like objects"""
- if not f:
- raise ShapefileException("No file-like object available.")
- if isinstance(f, str):
- pth = os.path.split(f)[0]
- if pth and not os.path.exists(pth):
- os.makedirs(pth)
- fp = open(f, "wb+")
- self._files_to_close.append(fp)
- return fp
-
- if hasattr(f, "write"):
- return f
- raise ShapefileException(f"Unsupported file-like object: {f}")
-
- def __shpFileLength(self) -> int:
+ def _shp_file_length_B(self) -> int:
"""Calculates the file length of the shp file."""
- shp = self.__getFileObj(self.shp)
-
# Remember starting position
+ start_B = self.shp.tell()
- start = shp.tell()
# Calculate size of all shapes
- shp.seek(0, 2)
- size = shp.tell()
+ self.shp.seek(0, 2)
+ size_16b_words = self.shp.tell()
# Calculate size as 16-bit words
- size //= 2
+ size_B = size_16b_words // 2
# Return to start
- shp.seek(start)
- return size
+ self.shp.seek(start_B)
+ return size_B
def _update_file_bbox(self, s: Shape) -> None:
if s.shapeType == NULL:
@@ -3459,134 +3937,65 @@ class Writer:
"""Returns the current m extremes for the shapefile."""
return self._mbox
- def __shapefileHeader(
+ def _shp_or_shx_header(
self,
- fileObj: WriteSeekableBinStream | None,
- headerType: Literal["shp", "dbf", "shx"] = "shp",
+ f: WriteSeekableBinStream,
+ headerType: Literal["shp", "shx"],
) -> None:
"""Writes the specified header type to the specified file-like object.
Several of the shapefile formats are so similar that a single generic
method to read or write them is warranted."""
- f = self.__getFileObj(fileObj)
f.seek(0)
# File code, Unused bytes
f.write(pack(">6i", 9994, 0, 0, 0, 0, 0))
# File length (Bytes / 2 = 16-bit words)
if headerType == "shp":
- f.write(pack(">i", self.__shpFileLength()))
+ f.write(pack(">i", self._shp_file_length_B()))
elif headerType == "shx":
f.write(pack(">i", ((100 + (self.shpNum * 8)) // 2)))
# Version, Shape type
if self.shapeType is None:
self.shapeType = NULL
f.write(pack("<2i", 1000, self.shapeType))
- # The shapefile's bounding box (lower left, upper right)
- if self.shapeType != 0:
- try:
- bbox = self.bbox()
- if bbox is None:
- # The bbox is initialized with None, so this would mean the shapefile contains no valid geometries.
- # In such cases of empty shapefiles, ESRI spec says the bbox values are 'unspecified'.
- # Not sure what that means, so for now just setting to 0s, which is the same behavior as in previous versions.
- # This would also make sense since the Z and M bounds are similarly set to 0 for non-Z/M type shapefiles.
- # bbox = BBox(0, 0, 0, 0)
- bbox = (0, 0, 0, 0)
- f.write(pack("<4d", *bbox))
- except error:
- raise ShapefileException(
- "Failed to write shapefile bounding box. Floats required."
- )
- else:
- f.write(pack("<4d", 0, 0, 0, 0))
- # Elevation
- if self.shapeType in PointZ_shapeTypes | _HasZ_shapeTypes:
- # Z values are present in Z type
- zbox = self.zbox()
- if zbox is None:
- # means we have empty shapefile/only null geoms (see commentary on bbox above)
- # zbox = ZBox(0, 0)
- zbox = (0, 0)
- else:
- # As per the ESRI shapefile spec, the zbox for non-Z type shapefiles are set to 0s
- # zbox = ZBox(0, 0)
- zbox = (0, 0)
- # Measure
- if self.shapeType in PointM_shapeTypes | _HasM_shapeTypes:
- # M values are present in M or Z type
- mbox = self.mbox()
- if mbox is None:
- # means we have empty shapefile/only null geoms (see commentary on bbox above)
- # mbox = MBox(0, 0)
- mbox = (0, 0)
- else:
- # As per the ESRI shapefile spec, the mbox for non-M type shapefiles are set to 0s
- # mbox = MBox(0, 0)
- mbox = (0, 0)
- # Try writing
+
+ # BBox, the shapefile's bounding box (lower left, upper right)
+ # In such cases of empty shapefiles, ESRI spec says the bbox values are 'unspecified'.
+ # Not sure what that means, so for now just setting to 0s, which is the same behavior as in previous versions.
+ # This would also make sense since the Z and M bounds are similarly set to 0 for non-Z/M type shapefiles.
+ # bbox: BBox = (0, 0, 0, 0)
+ bbox = self.bbox() or (0, 0, 0, 0)
try:
- f.write(pack("<4d", zbox[0], zbox[1], mbox[0], mbox[1]))
- except error:
+ f.write(pack("<4d", *bbox))
+ except StructError:
raise ShapefileException(
- "Failed to write shapefile elevation and measure values. Floats required."
+ "Failed to write shapefile bounding box. Floats required."
)
- def __dbfHeader(self) -> None:
- """Writes the dbf header and field descriptors."""
- f = self.__getFileObj(self.dbf)
- f.seek(0)
- version = 3
- year, month, day = time.localtime()[:3]
- year -= 1900
- # Get all fields, ignoring DeletionFlag if specified
- fields = [field for field in self.fields if field[0] != "DeletionFlag"]
- # Ensure has at least one field
- if not fields:
- raise ShapefileException(
- "Shapefile dbf file must contain at least one field."
- )
- numRecs = self.recNum
- numFields = len(fields)
- headerLength = numFields * 32 + 33
- if headerLength >= 65535:
+ # Elevation
+ # As per the ESRI shapefile spec, the zbox for non-Z type shapefiles are set to 0s
+ # zbox : ZBox = (0, 0). We also do this for empty shapefiles and null-shapes only files.
+ zbox = self.zbox() or (0, 0)
+
+ # Ms
+ # As per the ESRI shapefile spec, the mbox for non-M type shapefiles are set to 0s
+ # mbox: Mbox = (0, 0). We also do this for empty shapefiles and null-shapes only files.
+ mbox = self.mbox() or (0, 0)
+
+ # Try writing
+ try:
+ f.write(pack("<4d", *zbox, *mbox))
+ except StructError:
raise ShapefileException(
- "Shapefile dbf header length exceeds maximum length."
- )
- recordLength = sum(field.size for field in fields) + 1
- header = pack(
- "<BBBBLHH20x",
- version,
- year,
- month,
- day,
- numRecs,
- headerLength,
- recordLength,
- )
- f.write(header)
- # Field descriptors
- for field in fields:
- encoded_name = field.name.encode(self.encoding, self.encodingErrors)
- encoded_name = encoded_name.replace(b" ", b"_")
- encoded_name = encoded_name[:10].ljust(11).replace(b" ", b"\x00")
- encodedFieldType = field.field_type.encode("ascii")
- fld = pack(
- "<11sc4xBB14x",
- encoded_name,
- encodedFieldType,
- field.size,
- field.decimal,
+ "Failed to write shapefile elevation and measure values. Floats required."
)
- f.write(fld)
- # Terminator
- f.write(b"\r")
def shape(
self,
s: Shape | HasGeoInterface | GeoJSONHomogeneousGeometryObject,
) -> None:
# Balance if already not balanced
- if self.autoBalance and self.recNum < self.shpNum:
+ if self.autoBalance and self.dbf_writer.recNum < self.shpNum:
self.balance()
# Check is shape or import from geojson
if not isinstance(s, Shape):
@@ -3603,12 +4012,12 @@ class Writer:
)
s = Shape._from_geojson(shape_dict)
# Write to file
- offset, length = self.__shpRecord(s)
- if self.shx:
- self.__shxRecord(offset, length)
+ offset, length = self._shp_record(s)
+ if self._shx:
+ self._shx_record(offset, length)
- def __shpRecord(self, s: Shape) -> tuple[int, int]:
- f: WriteSeekableBinStream = self.__getFileObj(self.shp)
+ def _shp_record(self, s: Shape) -> tuple[int, int]:
+ f = self.shp
offset = f.tell()
self.shpNum += 1
@@ -3668,13 +4077,13 @@ class Writer:
f.write(b_io.read())
return offset, length
- def __shxRecord(self, offset: int, length: int) -> None:
+ def _shx_record(self, offset: int, length: int) -> None:
"""Writes the shx records."""
- f = self.__getFileObj(self.shx)
+ f = self.shx
try:
f.write(pack(">i", offset // 2))
- except error:
+ except StructError:
raise ShapefileException(
"The .shp file has reached its file size limit > 4294967294 bytes (4.29 GB). To fix this, break up your file into multiple smaller ones."
)
@@ -3685,140 +4094,28 @@ class Writer:
*recordList: RecordValue,
**recordDict: RecordValue,
) -> None:
- """Creates a dbf attribute record. You can submit either a sequence of
- field values or keyword arguments of field names and values. Before
- adding records you must add fields for the record values using the
- field() method. If the record values exceed the number of fields the
- extra ones won't be added. In the case of using keyword arguments to specify
- field/value pairs only fields matching the already registered fields
- will be added."""
# Balance if already not balanced
- if self.autoBalance and self.recNum > self.shpNum:
+ if self.autoBalance and self.dbf_writer.recNum > self.shpNum:
self.balance()
- record: list[RecordValue]
- fieldCount = sum(1 for field in self.fields if field[0] != "DeletionFlag")
- if recordList:
- record = list(recordList)
- while len(record) < fieldCount:
- record.append("")
- elif recordDict:
- record = []
- for field in self.fields:
- if field[0] == "DeletionFlag":
- continue # ignore deletionflag field in case it was specified
- if field[0] in recordDict:
- val = recordDict[field[0]]
- if val is None:
- record.append("")
- else:
- record.append(val)
- else:
- record.append("") # need empty value for missing dict entries
- else:
- # Blank fields for empty record
- record = ["" for _ in range(fieldCount)]
- self.__dbfRecord(record)
-
- def __dbfRecord(self, record: list[RecordValue]) -> None:
- """Writes the dbf records."""
- f = self.__getFileObj(self.dbf)
- if self.recNum == 0:
- # first records, so all fields should be set
- # allowing us to write the dbf header
- # cannot change the fields after this point
- self.__dbfHeader()
- # first byte of the record is deletion flag, always disabled
- f.write(b" ")
- # begin
- self.recNum += 1
- fields = (
- field for field in self.fields if field[0] != "DeletionFlag"
- ) # ignore deletionflag field in case it was specified
- for (fieldName, fieldType, size, deci), value in zip(fields, record):
- # write
- # fieldName, fieldType, size and deci were already checked
- # when their Field instance was created and added to self.fields
- str_val: str | None = None
-
- if fieldType in ("N", "F"):
- # numeric or float: number stored as a string, right justified, and padded with blanks to the width of the field.
- if value in MISSING:
- str_val = "*" * size # QGIS NULL
- elif not deci:
- # force to int
- try:
- # first try to force directly to int.
- # forcing a large int to float and back to int
- # will lose information and result in wrong nr.
- num_val = int(cast(int, value))
- except ValueError:
- # forcing directly to int failed, so was probably a float.
- num_val = int(float(cast(float, value)))
- str_val = format(num_val, "d")[:size].rjust(
- size
- ) # caps the size if exceeds the field size
- else:
- f_val = float(cast(float, value))
- str_val = format(f_val, f".{deci}f")[:size].rjust(
- size
- ) # caps the size if exceeds the field size
- elif fieldType == "D":
- # date: 8 bytes - date stored as a string in the format YYYYMMDD.
- if isinstance(value, date):
- str_val = f"{value.year:04d}{value.month:02d}{value.day:02d}"
- elif isinstance(value, list) and len(value) == 3:
- str_val = f"{value[0]:04d}{value[1]:02d}{value[2]:02d}"
- elif value in MISSING:
- str_val = "0" * 8 # QGIS NULL for date type
- elif isinstance(value, str) and len(value) == 8:
- pass # value is already a date string
- else:
- raise ShapefileException(
- "Date values must be either a datetime.date object, a list, a YYYYMMDD string, or a missing value."
- )
- elif fieldType == "L":
- # logical: 1 byte - initialized to 0x20 (space) otherwise T or F.
- if value in MISSING:
- str_val = " " # missing is set to space
- elif value in [True, 1]:
- str_val = "T"
- elif value in [False, 0]:
- str_val = "F"
- else:
- str_val = " " # unknown is set to space
-
- if str_val is None:
- # Types C and M, and anything else, value is forced to string,
- # encoded by the codec specified to the Writer (utf-8 by default),
- # then the resulting bytes are padded and truncated to the length
- # of the field
- encoded = (
- str(value)
- .encode(self.encoding, self.encodingErrors)[:size]
- .ljust(size)
- )
- else:
- # str_val was given a not-None string value
- # under the checks for fieldTypes "N", "F", "D", or "L" above
- # Numeric, logical, and date numeric types are ascii already, but
- # for Shapefile or dbf spec reasons
- # "should be default ascii encoding"
- encoded = str_val.encode("ascii", self.encodingErrors)
+ self.dbf_writer.record(*recordList, **recordDict)
- if len(encoded) != size:
- raise ShapefileException(
- f"Shapefile Writer unable to pack incorrect sized {value=}"
- f" (encoded as {len(encoded)}B) into field '{fieldName}' ({size}B)."
- )
- f.write(encoded)
+ def field(
+ # Types of args should match *Field
+ self,
+ name: str,
+ field_type: FieldTypeT = "C",
+ size: int = 50,
+ decimal: int = 0,
+ ) -> None:
+ self.dbf_writer.field(name, field_type, size, decimal)
def balance(self) -> None:
"""Adds corresponding empty attributes or null geometry records depending
on which type of record was created to make sure all three files
are in synch."""
- while self.recNum > self.shpNum:
+ while self.dbf_writer.recNum > self.shpNum:
self.null()
- while self.recNum < self.shpNum:
+ while self.dbf_writer.recNum < self.shpNum:
self.record()
def null(self) -> None:
@@ -3928,22 +4225,6 @@ class Writer:
shape = MultiPatch(lines=parts, partTypes=partTypes)
self.shape(shape)
- def field(
- # Types of args should match *Field
- self,
- name: str,
- field_type: FieldTypeT = "C",
- size: int = 50,
- decimal: int = 0,
- ) -> None:
- """Adds a dbf field descriptor to the shapefile."""
- if len(self.fields) >= 2046:
- raise ShapefileException(
- "Shapefile Writer reached maximum number of fields: 2046."
- )
- field_ = Field.from_unchecked(name, field_type, size, decimal)
- self.fields.append(field_)
-
# Begin Testing
def _get_doctests() -> doctest.DocTest:
=====================================
test_shapefile.py
=====================================
@@ -504,7 +504,9 @@ def test_reader_url():
with Reader(url) as sf:
for __recShape in sf.iterShapeRecords():
pass
- assert sf.shp.closed is sf.shx.closed is sf.dbf.closed is True
+ assert sf.shp.closed
+ assert sf._shx is None or sf.shx.closed
+ assert sf.dbf.closed
# test without extension
url = "https://github.com/nvkelso/natural-earth-vector/blob/master/110m_cultural/ne_110m_admin_0_tiny_countries?raw=true"
@@ -512,7 +514,9 @@ def test_reader_url():
for __recShape in sf.iterShapeRecords():
pass
assert len(sf) > 0
- assert sf.shp.closed is sf.shx.closed is sf.dbf.closed is True
+ assert sf.shp.closed
+ assert sf._shx is None or sf.shx.closed
+ assert sf.dbf.closed
# test no files found
url = "https://raw.githubusercontent.com/nvkelso/natural-earth-vector/master/README.md"
@@ -766,7 +770,7 @@ def test_reader_shp_shx_only():
def test_reader_shp_shx_only_from_Paths():
"""
Assert that specifying just the
- shp and shx argument to the shapefile reader as Paths
+ shp and shx arguments to the shapefile reader as Paths
reads just the shp and shx file.
"""
with shapefile.Reader(
@@ -780,7 +784,7 @@ def test_reader_shp_shx_only_from_Paths():
def test_reader_shp_dbf_only():
"""
Assert that specifying just the
- shp and shx argument to the shapefile reader
+ shp and dbf arguments to the shapefile reader
reads just the shp and dbf file.
"""
with shapefile.Reader(
@@ -796,7 +800,7 @@ def test_reader_shp_dbf_only():
def test_reader_shp_dbf_only_from_Paths():
"""
Assert that specifying just the
- shp and shx argument to the shapefile reader as Paths
+ shp and dbf arguments to the shapefile reader as Paths
reads just the shp and dbf file.
"""
with shapefile.Reader(
@@ -891,8 +895,9 @@ def test_reader_filelike_shp_only():
def test_reader_shapefile_delayed_load():
"""
- Assert that the filename's extension is
- ignored when reading a shapefile.
+ Assert that both:
+ i) reading a shape from an uninitialised Reader() raises ShapefileException and,
+ ii) it can still load a shapefile for reading afterwards, via .load(...).
"""
with shapefile.Reader() as sf:
# assert that data request raises exception, since no file has been provided yet
@@ -1475,6 +1480,26 @@ def test_shaperecord_record():
assert record[1:3] == ["060750601001", 4715]
+def test_reader_zip_polyylinez_no_m_itershaperecords():
+ """
+ Make sure the M field is initialised to None (so the
+ fix from the bgu in 3.0.2 isn't regressed)!
+
+ Test Polygonz Shapes can be read, even if the m field is missing
+ (all the points in this file are 2D only, so this could also be
+ saved as a Polygon / type 5 shapefile instead of the shape type
+ 15 one it currently is).
+
+ REL.zip included with permission: https://github.com/OpenNHM/AvaFrame/issues/1203#issuecomment-4477589128
+ Owner: Open Natural Hazard Modelling
+ Original source: https://github.com/OpenNHM/AvaFrameData/blob/main/avaPopeletzbach/
+ License CC-BY-4.0
+ """
+ with shapefile.Reader("shapefiles/test/REL.zip") as sf:
+ for _shaperec in sf.iterShapeRecords():
+ pass
+
+
def test_write_field_name_limit(tmpdir):
"""
Abc...
@@ -1505,7 +1530,7 @@ def test_write_shp_only(tmpdir):
filename = tmpdir.join("test").strpath
with shapefile.Writer(shp=filename + ".shp") as writer:
writer.point(1, 1)
- assert writer.shp and not writer.shx and not writer.dbf
+ assert writer.shp and not writer._shx and not writer._dbf
assert writer.shpNum == 1
assert len(writer) == 1
assert writer.shp.closed is True
@@ -1515,7 +1540,7 @@ def test_write_shp_only(tmpdir):
# test that can read shapes
with shapefile.Reader(shp=filename + ".shp") as reader:
- assert reader.shp and not reader.shx and not reader.dbf
+ assert reader._shp and not reader._shx and not reader._dbf
assert (reader.numRecords, reader.numShapes) == (
None,
None,
@@ -1538,7 +1563,7 @@ def test_write_shp_shx_only(tmpdir):
filename = tmpdir.join("test").strpath
with shapefile.Writer(shp=filename + ".shp", shx=filename + ".shx") as writer:
writer.point(1, 1)
- assert writer.shp and writer.shx and not writer.dbf
+ assert writer.shp and writer.shx and not writer._dbf
assert writer.shpNum == 1
assert len(writer) == 1
assert writer.shp.closed is writer.shx.closed is True
@@ -1551,7 +1576,7 @@ def test_write_shp_shx_only(tmpdir):
# test that can read shapes and offsets
with shapefile.Reader(shp=filename + ".shp", shx=filename + ".shx") as reader:
- assert reader.shp and reader.shx and not reader.dbf
+ assert reader.shp and reader.shx and not reader._dbf
assert (reader.numRecords, reader.numShapes) == (None, 1)
reader.shape(0) # trigger reading of shx offsets
assert len(reader._offsets) == 1
@@ -1572,7 +1597,7 @@ def test_write_shp_dbf_only(tmpdir):
writer.field("field1", "C") # required to create a valid dbf file
writer.record("value")
writer.point(1, 1)
- assert writer.shp and not writer.shx and writer.dbf
+ assert writer.shp and not writer._shx and writer.dbf
assert writer.shpNum == writer.recNum == 1
assert len(writer) == 1
assert writer.shp.closed is writer.dbf.closed is True
@@ -1585,7 +1610,7 @@ def test_write_shp_dbf_only(tmpdir):
# test that can read records and shapes
with shapefile.Reader(shp=filename + ".shp", dbf=filename + ".dbf") as reader:
- assert reader.shp and not reader.shx and reader.dbf
+ assert reader.shp and not reader._shx and reader.dbf
assert (reader.numRecords, reader.numShapes) == (
1,
None,
@@ -1607,7 +1632,7 @@ def test_write_dbf_only(tmpdir):
with shapefile.Writer(dbf=filename + ".dbf") as writer:
writer.field("field1", "C") # required to create a valid dbf file
writer.record("value")
- assert not writer.shp and not writer.shx and writer.dbf
+ assert not writer._shp and not writer._shx and writer.dbf
assert writer.recNum == 1
assert len(writer) == 1
assert writer.dbf.closed is True
@@ -1617,7 +1642,7 @@ def test_write_dbf_only(tmpdir):
# test that can read records
with shapefile.Reader(dbf=filename + ".dbf") as reader:
- assert not writer.shp and not writer.shx and writer.dbf
+ assert not reader._shp and not reader._shx and reader.dbf
assert (reader.numRecords, reader.numShapes) == (1, None)
assert len(reader.records()) == 1
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyshp/-/compare/e38eaaf2df7b0257928478fee2fb359f2e32e739...43f51f32c57389d0d2b4f55844ab0e30fc8c9ffd
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyshp/-/compare/e38eaaf2df7b0257928478fee2fb359f2e32e739...43f51f32c57389d0d2b4f55844ab0e30fc8c9ffd
You're receiving this email because of your account on salsa.debian.org. Manage all notifications: https://salsa.debian.org/-/profile/notifications | Help: https://salsa.debian.org/help
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20260523/bb827039/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list