[med-svn] [Git][med-team/python-hl7][master] 6 commits: Fix watchfile to detect new versions on github
Andreas Tille (@tille)
gitlab at salsa.debian.org
Thu Oct 7 09:22:25 BST 2021
Andreas Tille pushed to branch master at Debian Med / python-hl7
Commits:
aa37beac by Andreas Tille at 2021-10-07T09:55:29+02:00
Fix watchfile to detect new versions on github
- - - - -
b47ce5b3 by Andreas Tille at 2021-10-07T09:55:31+02:00
routine-update: New upstream version
- - - - -
12e51917 by Andreas Tille at 2021-10-07T09:55:32+02:00
New upstream version 0.4.2
- - - - -
b1387175 by Andreas Tille at 2021-10-07T09:55:32+02:00
Update upstream source from tag 'upstream/0.4.2'
Update to upstream version '0.4.2'
with Debian dir b5591118320b5025a93a36da683b4f2374649549
- - - - -
aca99fe3 by Andreas Tille at 2021-10-07T09:55:32+02:00
routine-update: Standards-Version: 4.6.0
- - - - -
0d4e2a20 by Andreas Tille at 2021-10-07T09:56:31+02:00
routine-update: Ready to upload to unstable
- - - - -
18 changed files:
- + .github/workflows/codeql-analysis.yml
- .github/workflows/test.yaml
- debian/changelog
- debian/control
- debian/watch
- docs/api.rst
- docs/changelog.rst
- hl7/__init__.py
- hl7/containers.py
- + hl7/exceptions.py
- hl7/mllp/streams.py
- hl7/parser.py
- hl7/util.py
- hl7/version.py
- tests/samples.py
- tests/test_parse.py
- tests/test_util.py
- tox.ini
Changes:
=====================================
.github/workflows/codeql-analysis.yml
=====================================
@@ -0,0 +1,71 @@
+# For most projects, this workflow file will not need changing; you simply need
+# to commit it to your repository.
+#
+# You may wish to alter this file to override the set of languages analyzed,
+# or to provide custom queries or build logic.
+name: "CodeQL"
+
+on:
+ push:
+ branches: [master]
+ pull_request:
+ # The branches below must be a subset of the branches above
+ branches: [master]
+ schedule:
+ - cron: '0 16 * * 1'
+
+jobs:
+ analyze:
+ name: Analyze
+ runs-on: ubuntu-latest
+
+ strategy:
+ fail-fast: false
+ matrix:
+ # Override automatic language detection by changing the below list
+ # Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
+ language: ['python']
+ # Learn more...
+ # https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout at v2
+ with:
+ # We must fetch at least the immediate parents so that if this is
+ # a pull request then we can checkout the head.
+ fetch-depth: 2
+
+ # If this run was triggered by a pull request event, then checkout
+ # the head of the pull request instead of the merge commit.
+ - run: git checkout HEAD^2
+ if: ${{ github.event_name == 'pull_request' }}
+
+ # Initializes the CodeQL tools for scanning.
+ - name: Initialize CodeQL
+ uses: github/codeql-action/init at v1
+ with:
+ languages: ${{ matrix.language }}
+ # If you wish to specify custom queries, you can do so here or in a config file.
+ # By default, queries listed here will override any specified in a config file.
+ # Prefix the list here with "+" to use these queries and those in the config file.
+ # queries: ./path/to/local/query, your-org/your-repo/queries at main
+
+ # Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
+ # If this step fails, then you should remove it and run the build manually (see below)
+ - name: Autobuild
+ uses: github/codeql-action/autobuild at v1
+
+ # ℹ️ Command-line programs to run using the OS shell..
+ # 📚 https://git.io/JvXDl
+
+ # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
+ # and modify them (or add more) to build your code if your project
+ # uses a compiled language
+
+ #- run: |
+ # make bootstrap
+ # make release
+
+ - name: Perform CodeQL Analysis
+ uses: github/codeql-action/analyze at v1
=====================================
.github/workflows/test.yaml
=====================================
@@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: [3.5, 3.6, 3.7, 3.8]
+ python-version: [3.5, 3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout at v2
=====================================
debian/changelog
=====================================
@@ -1,3 +1,11 @@
+python-hl7 (0.4.2-1) unstable; urgency=medium
+
+ * Fix watchfile to detect new versions on github
+ * New upstream version
+ * Standards-Version: 4.6.0 (routine-update)
+
+ -- Andreas Tille <tille at debian.org> Thu, 07 Oct 2021 09:55:40 +0200
+
python-hl7 (0.4.1-1) unstable; urgency=medium
* New upstream version
=====================================
debian/control
=====================================
@@ -11,7 +11,7 @@ Build-Depends: debhelper-compat (= 13),
python3-setuptools,
python3-six,
python3-mock
-Standards-Version: 4.5.0
+Standards-Version: 4.6.0
Vcs-Browser: https://salsa.debian.org/med-team/python-hl7
Vcs-Git: https://salsa.debian.org/med-team/python-hl7.git
Homepage: https://github.com/johnpaulett/python-hl7/
=====================================
debian/watch
=====================================
@@ -1,5 +1,5 @@
version=4
-https://github.com/johnpaulett/python-hl7/releases .*/archive/@ANY_VERSION@@ARCHIVE_EXT@
+https://github.com/johnpaulett/python-hl7/releases .*/@ANY_VERSION@@ARCHIVE_EXT@
#https://pypi.debian.net/hl7/ hl7-([.\d]+)\.tar\.gz
=====================================
docs/api.rst
=====================================
@@ -13,8 +13,16 @@ python-hl7 API
.. autofunction:: hl7.parse
+.. autofunction:: hl7.parse_batch
+
+.. autofunction:: hl7.parse_file
+
+.. autofunction:: hl7.parse_hl7
+
.. autofunction:: hl7.ishl7
+.. autofunction:: hl7.isbatch
+
.. autofunction:: hl7.isfile
.. autofunction:: hl7.split_file
@@ -36,8 +44,14 @@ Data Types
.. autoclass:: hl7.Accessor
:members: __new__, parse_key, key, _replace, _make, _asdict, segment, segment_num, field_num, repeat_num, component_num, subcomponent_num
+.. autoclass:: hl7.Batch
+ :members: __str__, header, trailer, create_header, create_trailer, create_file, create_batch, create_message, create_segment, create_field, create_repetition, create_component
+
+.. autoclass:: hl7.File
+ :members: __str__, header, trailer, create_header, create_trailer, create_file, create_batch, create_message, create_segment, create_field, create_repetition, create_component
+
.. autoclass:: hl7.Message
- :members: segments, segment, __getitem__, __setitem__, escape, unescape, extract_field, assign_field, create_message, create_segment, create_field, create_repetition, create_component, create_ack
+ :members: segments, segment, __getitem__, __setitem__, __str__, escape, unescape, extract_field, assign_field, create_file, create_batch, create_message, create_segment, create_field, create_repetition, create_component, create_ack
.. autoclass:: hl7.Segment
=====================================
docs/changelog.rst
=====================================
@@ -1,6 +1,15 @@
Changelog
=========
+0.4.2 - February 2021
+---------------------
+
+* Added support for :py:class:`hl7.Batch` and :py:class:`hl7.File`, via
+ :py:func:`hl7.parse_hl7` or the more specific :py:func:`hl7.parse_batch`
+ and :py:func:`parse_file`.
+
+Thanks `Joseph Wortmann <https://github.com/joseph-wortmann>`_!
+
0.4.1 - September 2020
----------------------
=====================================
hl7/__init__.py
=====================================
@@ -7,18 +7,27 @@
"""
from .accessor import Accessor
from .containers import (
+ Batch,
Component,
Container,
Factory,
Field,
+ File,
Message,
Repetition,
Segment,
Sequence,
)
from .datatypes import parse_datetime
-from .parser import parse
-from .util import generate_message_control_id, isfile, ishl7, split_file
+from .exceptions import (
+ HL7Exception,
+ MalformedBatchException,
+ MalformedFileException,
+ MalformedSegmentException,
+ ParseException,
+)
+from .parser import parse, parse_batch, parse_file, parse_hl7
+from .util import generate_message_control_id, isbatch, isfile, ishl7, split_file
from .version import get_version
__version__ = get_version()
@@ -33,8 +42,13 @@ NULL = '""'
__all__ = [
"parse",
+ "parse_hl7",
+ "parse_batch",
+ "parse_file",
"Sequence",
"Container",
+ "File",
+ "Batch",
"Message",
"Segment",
"Field",
@@ -43,8 +57,14 @@ __all__ = [
"Factory",
"Accessor",
"ishl7",
+ "isbatch",
"isfile",
"split_file",
"generate_message_control_id",
"parse_datetime",
+ "HL7Exception",
+ "MalformedBatchException",
+ "MalformedFileException",
+ "MalformedSegmentException",
+ "ParseException",
]
=====================================
hl7/containers.py
=====================================
@@ -3,6 +3,11 @@ import datetime
import logging
from .accessor import Accessor
+from .exceptions import (
+ MalformedBatchException,
+ MalformedFileException,
+ MalformedSegmentException,
+)
from .util import generate_message_control_id
logger = logging.getLogger(__file__)
@@ -73,20 +78,266 @@ class Container(Sequence):
return self.__getitem__(slice(i, j))
def __str__(self):
- """Join a the child containers into a single string, separated
+ return self.separator.join((str(x) for x in self))
+
+
+class BuilderMixin(object):
+ """Mixin class that allows for the create functions
+ in the top-level container classes
+ """
+
+ def create_file(self, seq):
+ """Create a new :py:class:`hl7.File` compatible with this container"""
+ return self.factory.create_file(
+ self.separators[0],
+ seq,
+ esc=self.esc,
+ separators=self.separators,
+ factory=self.factory,
+ )
+
+ def create_batch(self, seq):
+ """Create a new :py:class:`hl7.Batch` compatible with this container"""
+ return self.factory.create_batch(
+ self.separators[0],
+ seq,
+ esc=self.esc,
+ separators=self.separators,
+ factory=self.factory,
+ )
+
+ def create_message(self, seq):
+ """Create a new :py:class:`hl7.Message` compatible with this container"""
+ return self.factory.create_message(
+ self.separators[0],
+ seq,
+ esc=self.esc,
+ separators=self.separators,
+ factory=self.factory,
+ )
+
+ def create_segment(self, seq):
+ """Create a new :py:class:`hl7.Segment` compatible with this container"""
+ return self.factory.create_segment(
+ self.separators[1],
+ seq,
+ esc=self.esc,
+ separators=self.separators[1:],
+ factory=self.factory,
+ )
+
+ def create_field(self, seq):
+ """Create a new :py:class:`hl7.Field` compatible with this container"""
+ return self.factory.create_field(
+ self.separators[2],
+ seq,
+ esc=self.esc,
+ separators=self.separators[2:],
+ factory=self.factory,
+ )
+
+ def create_repetition(self, seq):
+ """Create a new :py:class:`hl7.Repetition` compatible with this container"""
+ return self.factory.create_repetition(
+ self.separators[3],
+ seq,
+ esc=self.esc,
+ separators=self.separators[3:],
+ factory=self.factory,
+ )
+
+ def create_component(self, seq):
+ """Create a new :py:class:`hl7.Component` compatible with this container"""
+ return self.factory.create_component(
+ self.separators[4],
+ seq,
+ esc=self.esc,
+ separators=self.separators[4:],
+ factory=self.factory,
+ )
+
+
+class File(Container, BuilderMixin):
+ """Representation of an HL7 file from the batch protocol.
+ It contains a list of :py:class:`hl7.Batch`
+ instances. It may contain FHS/FTS :py:class:`hl7.Segment` instances.
+
+ Files may or may not be wrapped in FHS/FTS segements
+ deliniating the start/end of the batch. These are optional.
+ """
+
+ def __init__(
+ self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ super(File, self).__init__(
+ separator,
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+ self.header = None
+ self.trailer = None
+
+ @property
+ def header(self):
+ """FHS :py:class:`hl7.Segment`"""
+ return self._batch_header_segment
+
+ @header.setter
+ def header(self, segment):
+ if segment and segment[0][0] != "FHS":
+ raise MalformedSegmentException('header must begin with "FHS"')
+ self._batch_header_segment = segment
+
+ @property
+ def trailer(self):
+ """FTS :py:class:`hl7.Segment`"""
+ return self._batch_trailer_segment
+
+ @trailer.setter
+ def trailer(self, segment):
+ if segment and segment[0][0] != "FTS":
+ raise MalformedSegmentException('trailer must begin with "FTS"')
+ self._batch_trailer_segment = segment
+
+ def create_header(self):
+ """Create a new :py:class:`hl7.Segment` FHS compatible with this file"""
+ return self.create_segment(
+ [
+ self.create_field(["FHS"]),
+ self.create_field([self.separators[1]]),
+ self.create_field(
+ [
+ self.separators[3]
+ + self.separators[2]
+ + self.esc
+ + self.separators[4]
+ ]
+ ),
+ ]
+ )
+
+ def create_trailer(self):
+ """Create a new :py:class:`hl7.Segment` FTS compatible with this file"""
+ return self.create_segment([self.create_field(["FTS"])])
+
+ def __str__(self):
+ """Join a the child batches into a single string, separated
by the self.separator. This method acts recursively, calling
the children's __unicode__ method. Thus ``unicode()`` is the
approriate method for turning the python-hl7 representation of
HL7 into a standard string.
- >>> str(h) == message
- True
+ If this batch has FHS/FTS segments, they will be added to the
+ beginning/end of the returned string.
+ """
+ if (self.header and not self.trailer) or (not self.header and self.trailer):
+ raise MalformedFileException(
+ "Either both header and trailer must be present or neither"
+ )
+ return (
+ super(File, self).__str__()
+ if not self.header
+ else str(self.header)
+ + self.separator
+ + super(File, self).__str__()
+ + str(self.trailer)
+ + self.separator
+ )
+
+
+class Batch(Container, BuilderMixin):
+ """Representation of an HL7 batch from the batch protocol.
+ It contains a list of :py:class:`hl7.Message` instances.
+ It may contain BHS/BTS :py:class:`hl7.Segment` instances.
+
+ Batches may or may not be wrapped in BHS/BTS segements
+ deliniating the start/end of the batch. These are optional.
+ """
+
+ def __init__(
+ self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ super(Batch, self).__init__(
+ separator,
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+ self.header = None
+ self.trailer = None
+
+ @property
+ def header(self):
+ """BHS :py:class:`hl7.Segment`"""
+ return self._batch_header_segment
+
+ @header.setter
+ def header(self, segment):
+ if segment and segment[0][0] != "BHS":
+ raise MalformedSegmentException('header must begin with "BHS"')
+ self._batch_header_segment = segment
+
+ @property
+ def trailer(self):
+ """BTS :py:class:`hl7.Segment`"""
+ return self._batch_trailer_segment
+
+ @trailer.setter
+ def trailer(self, segment):
+ if segment and segment[0][0] != "BTS":
+ raise MalformedSegmentException('trailer must begin with "BTS"')
+ self._batch_trailer_segment = segment
+
+ def create_header(self):
+ """Create a new :py:class:`hl7.Segment` BHS compatible with this batch"""
+ return self.create_segment(
+ [
+ self.create_field(["BHS"]),
+ self.create_field([self.separators[1]]),
+ self.create_field(
+ [
+ self.separators[3]
+ + self.separators[2]
+ + self.esc
+ + self.separators[4]
+ ]
+ ),
+ ]
+ )
+
+ def create_trailer(self):
+ """Create a new :py:class:`hl7.Segment` BHS compatible with this batch"""
+ return self.create_segment([self.create_field(["BTS"])])
+
+ def __str__(self):
+ """Join a the child messages into a single string, separated
+ by the self.separator. This method acts recursively, calling
+ the children's __unicode__ method. Thus ``unicode()`` is the
+ approriate method for turning the python-hl7 representation of
+ HL7 into a standard string.
+ If this batch has BHS/BTS segments, they will be added to the
+ beginning/end of the returned string.
"""
- return self.separator.join((str(x) for x in self))
+ if (self.header and not self.trailer) or (not self.header and self.trailer):
+ raise MalformedBatchException(
+ "Either both header and trailer must be present or neither"
+ )
+ return (
+ super(Batch, self).__str__()
+ if not self.header
+ else str(self.header)
+ + self.separator
+ + super(Batch, self).__str__()
+ + str(self.trailer)
+ + self.separator
+ )
-class Message(Container):
+class Message(Container, BuilderMixin):
"""Representation of an HL7 message. It contains a list
of :py:class:`hl7.Segment` instances.
"""
@@ -484,56 +735,6 @@ class Message(Container):
return "".join(rv)
- def create_message(self, seq):
- """Create a new :py:class:`hl7.Message` compatible with this message"""
- return self.factory.create_message(
- self.separators[0],
- seq,
- esc=self.esc,
- separators=self.separators,
- factory=self.factory,
- )
-
- def create_segment(self, seq):
- """Create a new :py:class:`hl7.Segment` compatible with this message"""
- return self.factory.create_segment(
- self.separators[1],
- seq,
- esc=self.esc,
- separators=self.separators[1:],
- factory=self.factory,
- )
-
- def create_field(self, seq):
- """Create a new :py:class:`hl7.Field` compatible with this message"""
- return self.factory.create_field(
- self.separators[2],
- seq,
- esc=self.esc,
- separators=self.separators[2:],
- factory=self.factory,
- )
-
- def create_repetition(self, seq):
- """Create a new :py:class:`hl7.Repetition` compatible with this message"""
- return self.factory.create_repetition(
- self.separators[3],
- seq,
- esc=self.esc,
- separators=self.separators[3:],
- factory=self.factory,
- )
-
- def create_component(self, seq):
- """Create a new :py:class:`hl7.Component` compatible with this message"""
- return self.factory.create_component(
- self.separators[4],
- seq,
- esc=self.esc,
- separators=self.separators[4:],
- factory=self.factory,
- )
-
def create_ack(
self, ack_code="AA", message_id=None, application=None, facility=None
):
@@ -595,6 +796,16 @@ class Message(Container):
return ack
def __str__(self):
+ """Join a the child containers into a single string, separated
+ by the self.separator. This method acts recursively, calling
+ the children's __unicode__ method. Thus ``unicode()`` is the
+ approriate method for turning the python-hl7 representation of
+ HL7 into a standard string.
+
+ >>> str(hl7.parse(message)) == message
+ True
+
+ """
# Per spec, Message Construction Rules, Section 2.6 (v2.8), Message ends
# with the carriage return
return super(Message, self).__str__() + self.separator
@@ -612,7 +823,7 @@ class Segment(Container):
return index
def __str__(self):
- if str(self[0]) in ["MSH", "FHS"]:
+ if str(self[0]) in ["MSH", "FHS", "BHS"]:
return (
str(self[0])
+ str(self[1])
@@ -648,6 +859,8 @@ class Factory(object):
A subclass can be used to create specialized subclasses of each container.
"""
+ create_file = File #: Create an instance of :py:class:`hl7.File`
+ create_batch = Batch #: Create an instance of :py:class:`hl7.Batch`
create_message = Message #: Create an instance of :py:class:`hl7.Message`
create_segment = Segment #: Create an instance of :py:class:`hl7.Segment`
create_field = Field #: Create an instance of :py:class:`hl7.Field`
=====================================
hl7/exceptions.py
=====================================
@@ -0,0 +1,18 @@
+class HL7Exception(Exception):
+ pass
+
+
+class MalformedSegmentException(HL7Exception):
+ pass
+
+
+class MalformedBatchException(HL7Exception):
+ pass
+
+
+class MalformedFileException(HL7Exception):
+ pass
+
+
+class ParseException(HL7Exception):
+ pass
=====================================
hl7/mllp/streams.py
=====================================
@@ -248,7 +248,8 @@ class HL7StreamReader(MLLPStreamReader):
@encoding.setter
def encoding(self, encoding):
- assert not encoding or isinstance(encoding, str)
+ if encoding and not isinstance(encoding, str):
+ raise TypeError("encoding must be a str or None")
self._encoding = encoding or "ascii"
@property
@@ -257,7 +258,8 @@ class HL7StreamReader(MLLPStreamReader):
@encoding_errors.setter
def encoding_errors(self, encoding_errors):
- assert not encoding_errors or isinstance(encoding_errors, str)
+ if encoding_errors and not isinstance(encoding_errors, str):
+ raise TypeError("encoding_errors must be a str or None")
self._encoding_errors = encoding_errors or "strict"
async def readmessage(self):
@@ -291,7 +293,8 @@ class HL7StreamWriter(MLLPStreamWriter):
@encoding.setter
def encoding(self, encoding):
- assert not encoding or isinstance(encoding, str)
+ if encoding and not isinstance(encoding, str):
+ raise TypeError("encoding must be a str or None")
self._encoding = encoding or "ascii"
@property
@@ -300,7 +303,8 @@ class HL7StreamWriter(MLLPStreamWriter):
@encoding_errors.setter
def encoding_errors(self, encoding_errors):
- assert not encoding_errors or isinstance(encoding_errors, str)
+ if encoding_errors and not isinstance(encoding_errors, str):
+ raise TypeError("encoding_errors must be a str or None")
self._encoding_errors = encoding_errors or "strict"
def writemessage(self, message):
=====================================
hl7/parser.py
=====================================
@@ -1,8 +1,63 @@
# -*- coding: utf-8 -*-
+from string import whitespace
+
from .containers import Factory
+from .exceptions import ParseException
+from .util import isbatch, isfile, ishl7
+
+_HL7_WHITESPACE = whitespace.replace("\r", "")
+
+
+def parse_hl7(line, encoding="utf-8", factory=Factory):
+ """Returns a instance of the :py:class:`hl7.Message`, :py:class:`hl7.Batch`
+ or :py:class:`hl7.File` that allows indexed access to the data elements or
+ messages or batches respectively.
+
+ A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
+ constructing the message/batch/file and it's components.
+
+ .. note::
+
+ HL7 usually contains only ASCII, but can use other character
+ sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
+ UTF-8 is the preferred character set [#]_.
+ python-hl7 works on Python unicode strings. :py:func:`hl7.parse_hl7`
+ will accept unicode string or will attempt to convert bytestrings
+ into unicode strings using the optional ``encoding`` parameter.
+ ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
+ in UTF-8, but for other character sets like 'cp1252' or 'latin1',
+ ``encoding`` must be set appropriately.
+
+ >>> h = hl7.parse_hl7(message)
+
+ To decode a non-UTF-8 byte string::
+
+ hl7.parse_hl7(message, encoding='latin1')
+
+ :rtype: :py:class:`hl7.Message` | :py:class:`hl7.Batch` | :py:class:`hl7.File`
+
+ .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
-def parse(line, encoding="utf-8", factory=Factory):
+ """
+ # Ensure we are working with unicode data, decode the bytestring
+ # if needed
+ if isinstance(line, bytes):
+ line = line.decode(encoding)
+ # If it is an HL7 message, parse as normal
+ if ishl7(line):
+ return parse(line, encoding=encoding, factory=factory)
+ # If we have a batch, then parse the batch
+ elif isbatch(line):
+ return parse_batch(line, encoding=encoding, factory=factory)
+ # If we have a file, parse the HL7 file
+ elif isfile(line):
+ return parse_file(line, encoding=encoding, factory=factory)
+ # Not an HL7 message
+ raise ValueError("line is not HL7")
+
+
+def parse(lines, encoding="utf-8", factory=Factory):
"""Returns a instance of the :py:class:`hl7.Message` that allows
indexed access to the data elements.
@@ -35,16 +90,211 @@ def parse(line, encoding="utf-8", factory=Factory):
"""
# Ensure we are working with unicode data, decode the bytestring
# if needed
- if isinstance(line, bytes):
- line = line.decode(encoding)
+ if isinstance(lines, bytes):
+ lines = lines.decode(encoding)
# Strip out unnecessary whitespace
- strmsg = line.strip()
+ strmsg = lines.strip()
# The method for parsing the message
plan = create_parse_plan(strmsg, factory)
# Start spliting the methods based upon the ParsePlan
return _split(strmsg, plan)
+def _create_batch(batch, messages, encoding, factory):
+ """Creates a :py:class:`hl7.Batch`
+ """
+ kwargs = {
+ "separator": "\r",
+ "sequence": [
+ parse(message, encoding=encoding, factory=factory) for message in messages
+ ],
+ }
+ # If the BHS/BTS were present, use those to set up the batch
+ # otherwise default
+ if batch:
+ batch = parse(batch, encoding=encoding, factory=factory)
+ kwargs["esc"] = batch.esc
+ kwargs["separators"] = batch.separators
+ kwargs["factory"] = batch.factory
+ parsed = factory.create_batch(**kwargs)
+ # If the BHS/BTS were present then set them
+ if batch:
+ parsed.header = batch.segment("BHS")
+ try:
+ parsed.trailer = batch.segment("BTS")
+ except KeyError:
+ parsed.trailer = parsed.create_segment([parsed.create_field(["BTS"])])
+ return parsed
+
+
+def parse_batch(lines, encoding="utf-8", factory=Factory):
+ """Returns a instance of a :py:class:`hl7.Batch`
+ that allows indexed access to the messages.
+
+ A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
+ constructing the batch and it's components.
+
+ .. note::
+
+ HL7 usually contains only ASCII, but can use other character
+ sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
+ UTF-8 is the preferred character set [#]_.
+
+ python-hl7 works on Python unicode strings. :py:func:`hl7.parse_batch`
+ will accept unicode string or will attempt to convert bytestrings
+ into unicode strings using the optional ``encoding`` parameter.
+ ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
+ in UTF-8, but for other character sets like 'cp1252' or 'latin1',
+ ``encoding`` must be set appropriately.
+
+ >>> h = hl7.parse_batch(message)
+
+ To decode a non-UTF-8 byte string::
+
+ hl7.parse_batch(message, encoding='latin1')
+
+ :rtype: :py:class:`hl7.Batch`
+
+ .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
+
+ """
+ # Ensure we are working with unicode data, decode the bytestring
+ # if needed
+ if isinstance(lines, bytes):
+ lines = lines.decode(encoding)
+ batch = None
+ messages = []
+ # Split the batch into lines, retaining the ends
+ for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
+ # strip out all whitespace MINUS the '\r'
+ line = line.strip(_HL7_WHITESPACE)
+ if line[:3] == "BHS":
+ if batch:
+ raise ParseException("Batch cannot have more than one BHS segment")
+ batch = line
+ elif line[:3] == "BTS":
+ if not batch or "\rBTS" in batch:
+ continue
+ batch += line
+ elif line[:3] == "MSH":
+ messages.append(line)
+ else:
+ if not messages:
+ raise ParseException(
+ "Segment received before message header {}".format(line)
+ )
+ messages[-1] += line
+ return _create_batch(batch, messages, encoding, factory)
+
+
+def _create_file(file, batches, encoding, factory):
+ kwargs = {
+ "separator": "\r",
+ "sequence": [
+ _create_batch(batch[0], batch[1], encoding, factory) for batch in batches
+ ],
+ }
+ # If the FHS/FTS are present, use them to set up the file
+ if file:
+ file = parse(file, encoding=encoding, factory=factory)
+ kwargs["esc"] = file.esc
+ kwargs["separators"] = file.separators
+ kwargs["factory"] = file.factory
+ parsed = factory.create_file(**kwargs)
+ # If the FHS/FTS are present, add them
+ if file:
+ parsed.header = file.segment("FHS")
+ try:
+ parsed.trailer = file.segment("FTS")
+ except KeyError:
+ parsed.trailer = parsed.create_segment([parsed.create_field(["FTS"])])
+ return parsed
+
+
+def parse_file(lines, encoding="utf-8", factory=Factory): # noqa: C901
+ """Returns a instance of the :py:class:`hl7.File` that allows
+ indexed access to the batches.
+
+ A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
+ constructing the file and it's components.
+
+ .. note::
+
+ HL7 usually contains only ASCII, but can use other character
+ sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
+ UTF-8 is the preferred character set [#]_.
+
+ python-hl7 works on Python unicode strings. :py:func:`hl7.parse_file`
+ will accept unicode string or will attempt to convert bytestrings
+ into unicode strings using the optional ``encoding`` parameter.
+ ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
+ in UTF-8, but for other character sets like 'cp1252' or 'latin1',
+ ``encoding`` must be set appropriately.
+
+ >>> h = hl7.parse_file(message)
+
+ To decode a non-UTF-8 byte string::
+
+ hl7.parse_file(message, encoding='latin1')
+
+ :rtype: :py:class:`hl7.File`
+
+ .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
+
+ """
+ # Ensure we are working with unicode data, decode the bytestring
+ # if needed
+ if isinstance(lines, bytes):
+ lines = lines.decode(encoding)
+ file = None
+ batches = []
+ messages = []
+ in_batch = False
+ # Split the file into lines, reatining the ends
+ for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
+ # strip out all whitespace MINUS the '\r'
+ line = line.strip(_HL7_WHITESPACE)
+ if line[:3] == "FHS":
+ if file:
+ raise ParseException("File cannot have more than one FHS segment")
+ file = line
+ elif line[:3] == "FTS":
+ if not file or "\rFTS" in file:
+ continue
+ file += line
+ elif line[:3] == "BHS":
+ if in_batch:
+ raise ParseException("Batch cannot have more than one BHS segment")
+ batches.append([line, []])
+ in_batch = True
+ elif line[:3] == "BTS":
+ if not in_batch:
+ continue
+ batches[-1][0] += line
+ in_batch = False
+ elif line[:3] == "MSH":
+ if in_batch:
+ batches[-1][1].append(line)
+ else: # Messages outside of a batch go into the "default" batch
+ messages.append(line)
+ else:
+ if in_batch:
+ if not batches[-1][1]:
+ raise ParseException(
+ "Segment received before message header {}".format(line)
+ )
+ batches[-1][1][-1] += line
+ else:
+ if not messages:
+ raise ParseException(
+ "Segment received before message header {}".format(line)
+ )
+ messages[-1] += line
+ if messages: # add the default batch, if we have one
+ batches.append([None, messages])
+ return _create_file(file, batches, encoding, factory)
+
+
def _split(text, plan):
"""Recursive function to split the *text* into an n-deep list,
according to the :py:class:`hl7._ParsePlan`.
@@ -58,7 +308,11 @@ def _split(text, plan):
# Parsing of the first segment is awkward because it contains
# the separator characters in a field
- if plan.containers[0] == plan.factory.create_segment and text[:3] in ["MSH", "FHS"]:
+ if plan.containers[0] == plan.factory.create_segment and text[:3] in [
+ "MSH",
+ "BHS",
+ "FHS",
+ ]:
seg = text[:3]
sep0 = text[3]
sep_end_off = text.find(sep0, 4)
@@ -87,7 +341,10 @@ def create_parse_plan(strmsg, factory=Factory):
separators = ["\r"]
# Extract the rest of the separators. Defaults used if not present.
- assert strmsg[:3] in ("MSH")
+ if strmsg[:3] not in ("MSH", "FHS", "BHS"):
+ raise ParseException(
+ "First segment is {}, must be one of MHS, FHS or BHS".format(strmsg[:3])
+ )
sep0 = strmsg[3]
seps = list(strmsg[3 : strmsg.find(sep0, 4)])
=====================================
hl7/util.py
=====================================
@@ -15,16 +15,29 @@ def ishl7(line):
:rtype: bool
"""
# Prevent issues if the line is empty
- return line and (line.strip()[:3] in ["MSH"]) or False
+ return line and line.strip()[:3] == "MSH" and line.count("MSH") == 1
+
+
+def isbatch(line):
+ """
+ Batches are wrapped in BHS / BTS or have more than one
+ message
+ BHS = batch header segment
+ BTS = batch trailer segment
+ """
+ return line and (
+ line.strip()[:3] == "BHS"
+ or (line.count("MSH") > 1 and line.strip()[:3] != "FHS")
+ )
def isfile(line):
"""
- Files are wrapped in FHS / FTS
+ Files are wrapped in FHS / FTS, or may be a batch
FHS = file header segment
FTS = file trailer segment
"""
- return line and (line.strip()[:3] in ["FHS"]) or False
+ return line and (line.strip()[:3] == "FHS" or isbatch(line))
def split_file(hl7file):
=====================================
hl7/version.py
=====================================
@@ -6,7 +6,7 @@ Primary version number source.
Forth element can be 'dev' < 'a' < 'b' < 'rc' < 'final'. An empty 4th
element is equivalent to 'final'.
"""
-VERSION = (0, 4, 1, "final")
+VERSION = (0, 4, 2, "final")
def get_version():
=====================================
tests/samples.py
=====================================
@@ -21,6 +21,114 @@ rep_sample_hl7 = "\r".join(
]
)
+# Source: http://www.health.vic.gov.au/hdss/vinah/2006-07/appendix-a-sample-messages.pdf
+sample_batch = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "",
+ ]
+)
+
+sample_batch1 = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778891|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|2",
+ "",
+ ]
+)
+
+sample_batch2 = "\r".join(
+ [
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778891|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "",
+ ]
+)
+
+sample_batch3 = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "",
+ ]
+)
+
+
+sample_batch4 = "\r".join(
+ [
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "",
+ ]
+)
+
+sample_bad_batch = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "",
+ ]
+)
+
+sample_bad_batch1 = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123402||||abchs20070101123401-1",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "",
+ ]
+)
+
# Source: http://www.health.vic.gov.au/hdss/vinah/2006-07/appendix-a-sample-messages.pdf
sample_file = "\r".join(
[
@@ -37,3 +145,175 @@ sample_file = "\r".join(
"",
]
)
+
+sample_file1 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778891|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|2",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-2",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|2",
+ "",
+ ]
+)
+
+sample_file2 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778891|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_file3 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "",
+ ]
+)
+
+sample_file4 = "\r".join(
+ [
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_file5 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_file6 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_bad_file = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_bad_file1 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123402||||abchs20070101123401-1",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_bad_file2 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "BHS|^~\\&||ABCHS||AUSDHSV|20070101123401||||abchs20070101123401-1",
+ "MSH|^~\\&||ABCHS||AUSDHSV|20070101112951||ADT^A04^ADT_A01|12334456778890|P|2.5|||NE|NE|AU|ASCII",
+ "EVN|A04|20060705000000",
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123402|||abchs20070101123401.hl7|",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "BTS|1",
+ "FTS|1",
+ "",
+ ]
+)
+
+sample_bad_file3 = "\r".join(
+ [
+ "FHS|^~\\&||ABCHS||AUSDHSV|20070101123401|||abchs20070101123401.hl7|",
+ "EVN|A04|20060705000000",
+ "PID|1||0000112234^^^100^A||XXXXXXXXXX^^^^^^S||10131113|1||4|^^RICHMOND^^3121||||1201||||||||1100|||||||||AAA",
+ "PD1||2",
+ "NK1|1||1||||||||||||||||||2",
+ "PV1|1|O||||^^^^^1",
+ "FTS|1",
+ "",
+ ]
+)
=====================================
tests/test_parse.py
=====================================
@@ -2,9 +2,30 @@
from unittest import TestCase
import hl7
-from hl7 import Accessor, Component, Field, Message, Repetition, Segment
-
-from .samples import rep_sample_hl7, sample_file, sample_hl7
+from hl7 import Accessor, Component, Field, Message, ParseException, Repetition, Segment
+
+from .samples import (
+ rep_sample_hl7,
+ sample_bad_batch,
+ sample_bad_batch1,
+ sample_bad_file,
+ sample_bad_file1,
+ sample_bad_file2,
+ sample_bad_file3,
+ sample_batch,
+ sample_batch1,
+ sample_batch2,
+ sample_batch3,
+ sample_batch4,
+ sample_file,
+ sample_file1,
+ sample_file2,
+ sample_file3,
+ sample_file4,
+ sample_file5,
+ sample_file6,
+ sample_hl7,
+)
class ParseTest(TestCase):
@@ -29,6 +50,186 @@ class ParseTest(TestCase):
self.assertEqual(str(msg), sample_hl7)
self.assertEqual(str(msg), sample_hl7)
+ def test_parse_batch(self):
+ batch = hl7.parse_batch(sample_batch)
+ self.assertEqual(len(batch), 1)
+ self.assertIsInstance(batch[0], hl7.Message)
+ self.assertIsInstance(batch.header, hl7.Segment)
+ self.assertEqual(batch.header[0][0], "BHS")
+ self.assertEqual(batch.header[4][0], "ABCHS")
+ self.assertIsInstance(batch.trailer, hl7.Segment)
+ self.assertEqual(batch.trailer[0][0], "BTS")
+ self.assertEqual(batch.trailer[1][0], "1")
+
+ def test_parse_batch1(self):
+ batch = hl7.parse_batch(sample_batch1)
+ self.assertEqual(len(batch), 2)
+ self.assertIsInstance(batch[0], hl7.Message)
+ self.assertEqual(batch[0][0][10][0], "12334456778890")
+ self.assertIsInstance(batch[1], hl7.Message)
+ self.assertEqual(batch[1][0][10][0], "12334456778891")
+ self.assertIsInstance(batch.header, hl7.Segment)
+ self.assertEqual(batch.header[0][0], "BHS")
+ self.assertEqual(batch.header[4][0], "ABCHS")
+ self.assertIsInstance(batch.trailer, hl7.Segment)
+ self.assertEqual(batch.trailer[0][0], "BTS")
+ self.assertEqual(batch.trailer[1][0], "2")
+
+ def test_parse_batch2(self):
+ batch = hl7.parse_batch(sample_batch2)
+ self.assertEqual(len(batch), 2)
+ self.assertIsInstance(batch[0], hl7.Message)
+ self.assertEqual(batch[0][0][10][0], "12334456778890")
+ self.assertIsInstance(batch[1], hl7.Message)
+ self.assertEqual(batch[1][0][10][0], "12334456778891")
+ self.assertFalse(batch.header)
+ self.assertFalse(batch.trailer)
+
+ def test_parse_batch3(self):
+ batch = hl7.parse_batch(sample_batch3)
+ self.assertEqual(len(batch), 1)
+ self.assertIsInstance(batch[0], hl7.Message)
+ self.assertIsInstance(batch.header, hl7.Segment)
+ self.assertEqual(batch.header[0][0], "BHS")
+ self.assertEqual(batch.header[4][0], "ABCHS")
+ self.assertIsInstance(batch.trailer, hl7.Segment)
+ self.assertEqual(batch.trailer[0][0], "BTS")
+
+ def test_parse_batch4(self):
+ batch = hl7.parse_batch(sample_batch4)
+ self.assertEqual(len(batch), 1)
+ self.assertIsInstance(batch[0], hl7.Message)
+ self.assertIsNone(batch.header)
+ self.assertIsNone(batch.trailer)
+
+ def test_parse_bad_batch(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_batch(sample_bad_batch)
+ self.assertIn("Segment received before message header", cm.exception.args[0])
+
+ def test_parse_bad_batch1(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_batch(sample_bad_batch1)
+ self.assertIn(
+ "Batch cannot have more than one BHS segment", cm.exception.args[0]
+ )
+
+ def test_parse_file(self):
+ file = hl7.parse_file(sample_file)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+ self.assertEqual(file.trailer[1][0], "1")
+
+ def test_parse_file1(self):
+ file = hl7.parse_file(sample_file1)
+ self.assertEqual(len(file), 2)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertEqual(file[0].trailer[1][0], "2")
+ self.assertIsInstance(file[1], hl7.Batch)
+ self.assertEqual(file[1].trailer[1][0], "1")
+ self.assertNotEqual(file[0], file[1])
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+ self.assertEqual(file.trailer[1][0], "2")
+
+ def test_parse_file2(self):
+ file = hl7.parse_file(sample_file2)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+ self.assertEqual(file.trailer[1][0], "1")
+
+ def test_parse_file3(self):
+ file = hl7.parse_file(sample_file3)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+
+ def test_parse_file4(self):
+ file = hl7.parse_file(sample_file4)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsNone(file.header)
+ self.assertIsNone(file.trailer)
+
+ def test_parse_file5(self):
+ file = hl7.parse_file(sample_file5)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+ self.assertEqual(file.trailer[1][0], "1")
+
+ def test_parse_file6(self):
+ file = hl7.parse_file(sample_file6)
+ self.assertEqual(len(file), 1)
+ self.assertIsInstance(file[0], hl7.Batch)
+ self.assertIsInstance(file.header, hl7.Segment)
+ self.assertEqual(file.header[0][0], "FHS")
+ self.assertEqual(file.header[4][0], "ABCHS")
+ self.assertIsInstance(file.trailer, hl7.Segment)
+ self.assertEqual(file.trailer[0][0], "FTS")
+ self.assertEqual(file.trailer[1][0], "1")
+
+ def test_parse_bad_file(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_file(sample_bad_file)
+ self.assertIn("Segment received before message header", cm.exception.args[0])
+
+ def test_parse_bad_file1(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_file(sample_bad_file1)
+ self.assertIn(
+ "Batch cannot have more than one BHS segment", cm.exception.args[0]
+ )
+
+ def test_parse_bad_file2(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_file(sample_bad_file2)
+ self.assertIn(
+ "File cannot have more than one FHS segment", cm.exception.args[0]
+ )
+
+ def test_parse_bad_file3(self):
+ with self.assertRaises(ParseException) as cm:
+ hl7.parse_file(sample_bad_file3)
+ self.assertIn("Segment received before message header", cm.exception.args[0])
+
+ def test_parse_hl7(self):
+ obj = hl7.parse_hl7(sample_hl7)
+ self.assertIsInstance(obj, hl7.Message)
+ obj = hl7.parse_hl7(sample_batch)
+ self.assertIsInstance(obj, hl7.Batch)
+ obj = hl7.parse_hl7(sample_batch1)
+ self.assertIsInstance(obj, hl7.Batch)
+ obj = hl7.parse_hl7(sample_batch2)
+ self.assertIsInstance(obj, hl7.Batch)
+ obj = hl7.parse_hl7(sample_file)
+ self.assertIsInstance(obj, hl7.File)
+ obj = hl7.parse_hl7(sample_file1)
+ self.assertIsInstance(obj, hl7.File)
+ obj = hl7.parse_hl7(sample_file2)
+ self.assertIsInstance(obj, hl7.File)
+
def test_bytestring_converted_to_unicode(self):
msg = hl7.parse(str(sample_hl7))
self.assertEqual(len(msg), 5)
=====================================
tests/test_util.py
=====================================
@@ -3,12 +3,26 @@ from unittest import TestCase
import hl7
-from .samples import sample_file, sample_hl7
+from .samples import (
+ sample_batch,
+ sample_batch1,
+ sample_batch2,
+ sample_file,
+ sample_file1,
+ sample_file2,
+ sample_hl7,
+)
class IsHL7Test(TestCase):
def test_ishl7(self):
self.assertTrue(hl7.ishl7(sample_hl7))
+ self.assertFalse(hl7.ishl7(sample_batch))
+ self.assertFalse(hl7.ishl7(sample_batch1))
+ self.assertFalse(hl7.ishl7(sample_batch2))
+ self.assertFalse(hl7.ishl7(sample_file))
+ self.assertFalse(hl7.ishl7(sample_file1))
+ self.assertFalse(hl7.ishl7(sample_file2))
def test_ishl7_empty(self):
self.assertFalse(hl7.ishl7(""))
@@ -20,6 +34,24 @@ class IsHL7Test(TestCase):
message = "OBX|1|SN|1554-5^GLUCOSE^POST 12H CFST:MCNC:PT:SER/PLAS:QN||^182|mg/dl|70_105|H|||F\r"
self.assertFalse(hl7.ishl7(message))
+ def test_isbatch(self):
+ self.assertFalse(hl7.ishl7(sample_batch))
+ self.assertFalse(hl7.ishl7(sample_batch1))
+ self.assertFalse(hl7.ishl7(sample_batch2))
+ self.assertTrue(hl7.isbatch(sample_batch))
+ self.assertTrue(hl7.isbatch(sample_batch1))
+ self.assertTrue(hl7.isbatch(sample_batch2))
+
def test_isfile(self):
self.assertFalse(hl7.ishl7(sample_file))
+ self.assertFalse(hl7.ishl7(sample_file1))
+ self.assertFalse(hl7.ishl7(sample_file2))
+ self.assertFalse(hl7.isbatch(sample_file))
+ self.assertFalse(hl7.isbatch(sample_file1))
+ self.assertFalse(hl7.isbatch(sample_file2))
self.assertTrue(hl7.isfile(sample_file))
+ self.assertTrue(hl7.isfile(sample_file1))
+ self.assertTrue(hl7.isfile(sample_file2))
+ self.assertTrue(hl7.isfile(sample_batch))
+ self.assertTrue(hl7.isfile(sample_batch1))
+ self.assertTrue(hl7.isfile(sample_batch2))
=====================================
tox.ini
=====================================
@@ -1,6 +1,6 @@
[tox]
envlist =
- py38, py37, py36, py35, docs
+ py39, py38, py37, py36, py35, docs
[testenv]
commands =
@@ -18,6 +18,9 @@ basepython = python3.7
[testenv:py38]
basepython = python3.8
+[testenv:py39]
+basepython = python3.9
+
[testenv:docs]
whitelist_externals = make
deps =
View it on GitLab: https://salsa.debian.org/med-team/python-hl7/-/compare/eab3e56d9fc4491bf83db27ebdddb91978974ae8...0d4e2a2020f748fb45e0dd1312908a8b080838a2
--
View it on GitLab: https://salsa.debian.org/med-team/python-hl7/-/compare/eab3e56d9fc4491bf83db27ebdddb91978974ae8...0d4e2a2020f748fb45e0dd1312908a8b080838a2
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20211007/32d25f99/attachment-0001.htm>
More information about the debian-med-commit
mailing list