[med-svn] [Git][med-team/python-hl7][upstream] New upstream version 0.4.5
Andreas Tille (@tille)
gitlab at salsa.debian.org
Tue Jul 19 13:33:46 BST 2022
Andreas Tille pushed to branch upstream at Debian Med / python-hl7
Commits:
e5cedb5a by Andreas Tille at 2022-07-17T16:00:23+02:00
New upstream version 0.4.5
- - - - -
23 changed files:
- .github/workflows/codeql-analysis.yml
- .github/workflows/test.yaml
- Makefile
- docs/accessors.rst
- docs/changelog.rst
- docs/conf.py
- docs/index.rst
- hl7/containers.py
- hl7/mllp/streams.py
- hl7/parser.py
- hl7/util.py
- hl7/version.py
- requirements.txt
- setup.py
- tests/backports/unittest/async_case.py
- tests/backports/unittest/case.py
- tests/samples.py
- tests/test_accessor.py
- tests/test_client.py
- tests/test_construction.py
- tests/test_parse.py
- tests/test_util.py
- tox.ini
Changes:
=====================================
.github/workflows/codeql-analysis.yml
=====================================
@@ -7,10 +7,10 @@ name: "CodeQL"
on:
push:
- branches: [master]
+ branches: [main]
pull_request:
# The branches below must be a subset of the branches above
- branches: [master]
+ branches: [main]
schedule:
- cron: '0 16 * * 1'
=====================================
.github/workflows/test.yaml
=====================================
@@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: [3.5, 3.6, 3.7, 3.8, 3.9]
+ python-version: [3.7, 3.8, 3.9, "3.10"]
steps:
- uses: actions/checkout at v2
=====================================
Makefile
=====================================
@@ -7,7 +7,7 @@ PIP = $(BIN)/pip
SPHINXBUILD = $(shell pwd)/env/bin/sphinx-build
env: requirements.txt setup.py
- test -f $(PYTHON) || virtualenv env
+ test -f $(PYTHON) || python3 -m venv env
$(PIP) install -U -r requirements.txt
$(PYTHON) setup.py develop
@@ -53,12 +53,12 @@ ISORT_ARGS=--check-only
BLACK_ARGS=--check
endif
format:
- $(BIN)/isort -rc $(ISORT_ARGS) hl7 tests
+ $(BIN)/isort $(ISORT_ARGS) hl7 tests
$(BIN)/black $(BLACK_ARGS) hl7 tests
.PHONY: isort
upload:
rm -rf dist
$(PYTHON) setup.py sdist bdist_wheel
- twine upload dist/*
+ $(BIN)/twine upload dist/*
.PHONY: upload
=====================================
docs/accessors.rst
=====================================
@@ -163,8 +163,8 @@ Create a response message.
>>> SEP = '|^~\&'
>>> CR_SEP = '\r'
- >>> MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ['MSH'])])
- >>> MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ['MSA'])])
+ >>> MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ['MSH'])])
+ >>> MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ['MSA'])])
>>> response = hl7.Message(CR_SEP, [MSH, MSA])
>>> response['MSH.F1.R1'] = SEP[0]
>>> response['MSH.F2.R1'] = SEP[1:]
@@ -255,7 +255,7 @@ The escape method returns a 'str' object. The unescape method returns a str obje
**Presentation Characters**
-HL7 defines a protocol for encoding presentation characters, These include hightlighting,
+HL7 defines a protocol for encoding presentation characters, These include highlighting,
and rich text functionality. The API does not currently allow for easy access to the
escape/unescape logic. You must overwrite the message class escape and unescape methods,
after parsing the message.
=====================================
docs/changelog.rst
=====================================
@@ -1,6 +1,25 @@
Changelog
=========
+0.4.5 - March 2022
+---------------------
+
+* Better support for :py:class:`HL7StreamProtocol` in Python 3.7, which lacks
+ `_reject_connection`
+
+Thanks `Joseph Wortmann <https://github.com/joseph-wortmann>`_!
+
+
+0.4.3 - March 2022
+---------------------
+
+* Dropped support for Python 3.5 & 3.6. Python 3.7 - 3.10 now supported.
+* Ensure :py:func:`hl7.parse_hl7` allows legitimate occurrences of "MSH" inside
+ the message contents
+
+Thanks `Andrew Wason <https://github.com/rectalogic>`_!
+
+
0.4.2 - February 2021
---------------------
=====================================
docs/conf.py
=====================================
@@ -104,6 +104,7 @@ html_theme_options = {
"github_repo": "python-hl7",
"codecov_button": True,
"github_banner": True,
+ "badge_branch": "main",
# "page_width": "940",
}
@@ -247,7 +248,7 @@ epub_copyright = u"2011, John Paulett"
# The format is a list of tuples containing the path and title.
# epub_pre_files = []
-# HTML files shat should be inserted after the pages created by sphinx.
+# HTML files that should be inserted after the pages created by sphinx.
# The format is a list of tuples containing the path and title.
# epub_post_files = []
=====================================
docs/index.rst
=====================================
@@ -9,7 +9,7 @@ server (:ref:`mllp_send <mllp-send>`).
HL7 is a communication protocol and message format for
health care data. It is the de-facto standard for transmitting data
between clinical information systems and between clinical devices.
-The version 2.x series, which is often is a pipe delimited format
+The version 2.x series, which is often in a pipe delimited format,
is currently the most widely accepted version of HL7 (there
is an alternative XML-based format).
@@ -164,7 +164,7 @@ syntax:
True
Since many many types of segments only have a single instance in a message
-(e.g. PID or MSH), :py:meth:`hl7.Message.segment` provides a convienance
+(e.g. PID or MSH), :py:meth:`hl7.Message.segment` provides a convenience
wrapper around :py:meth:`hl7.Message.segments` that returns the first matching
:py:class:`hl7.Segment`:
@@ -233,7 +233,7 @@ libraries can depend upon.
Python 2 vs Python 3 and Unicode vs Byte strings
-------------------------------------------------
-python-hl7 supports Python 3.5+ and primarily deals with the unicode ``str`` type.
+python-hl7 supports Python 3.7+ and primarily deals with the unicode ``str`` type.
Passing bytes to :py:func:`hl7.parse`, requires setting the
``encoding`` parameter, if using anything other than UTF-8. :py:func:`hl7.parse`
=====================================
hl7/containers.py
=====================================
@@ -8,7 +8,7 @@ from .exceptions import (
MalformedFileException,
MalformedSegmentException,
)
-from .util import generate_message_control_id
+from .util import escape, generate_message_control_id, unescape
logger = logging.getLogger(__file__)
@@ -49,6 +49,7 @@ class Container(Sequence):
def __init__(
self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
):
+ assert separator in separators
# Initialize the list object, optionally passing in the
# sequence. Since list([]) == [], using the default
# parameter will not cause any issues.
@@ -58,39 +59,10 @@ class Container(Sequence):
self.separators = separators
self.factory = factory if factory is not None else Factory
- def __getitem__(self, item):
- # Python slice operator was returning a regular list, not a
- # Container subclass
- sequence = super(Container, self).__getitem__(item)
- if isinstance(item, slice):
- return self.__class__(
- self.separator,
- sequence,
- self.esc,
- self.separators,
- factory=self.factory,
- )
- return sequence
-
- def __getslice__(self, i, j):
- # Python 2.x compatibility. __getslice__ is deprecated, and
- # we want to wrap the logic from __getitem__ when handling slices
- return self.__getitem__(slice(i, j))
-
- def __str__(self):
- return self.separator.join((str(x) for x in self))
-
-
-class BuilderMixin(object):
- """Mixin class that allows for the create functions
- in the top-level container classes
- """
-
def create_file(self, seq):
"""Create a new :py:class:`hl7.File` compatible with this container"""
return self.factory.create_file(
- self.separators[0],
- seq,
+ sequence=seq,
esc=self.esc,
separators=self.separators,
factory=self.factory,
@@ -99,8 +71,7 @@ class BuilderMixin(object):
def create_batch(self, seq):
"""Create a new :py:class:`hl7.Batch` compatible with this container"""
return self.factory.create_batch(
- self.separators[0],
- seq,
+ sequence=seq,
esc=self.esc,
separators=self.separators,
factory=self.factory,
@@ -109,8 +80,7 @@ class BuilderMixin(object):
def create_message(self, seq):
"""Create a new :py:class:`hl7.Message` compatible with this container"""
return self.factory.create_message(
- self.separators[0],
- seq,
+ sequence=seq,
esc=self.esc,
separators=self.separators,
factory=self.factory,
@@ -119,45 +89,63 @@ class BuilderMixin(object):
def create_segment(self, seq):
"""Create a new :py:class:`hl7.Segment` compatible with this container"""
return self.factory.create_segment(
- self.separators[1],
- seq,
+ sequence=seq,
esc=self.esc,
- separators=self.separators[1:],
+ separators=self.separators,
factory=self.factory,
)
def create_field(self, seq):
"""Create a new :py:class:`hl7.Field` compatible with this container"""
return self.factory.create_field(
- self.separators[2],
- seq,
+ sequence=seq,
esc=self.esc,
- separators=self.separators[2:],
+ separators=self.separators,
factory=self.factory,
)
def create_repetition(self, seq):
"""Create a new :py:class:`hl7.Repetition` compatible with this container"""
return self.factory.create_repetition(
- self.separators[3],
- seq,
+ sequence=seq,
esc=self.esc,
- separators=self.separators[3:],
+ separators=self.separators,
factory=self.factory,
)
def create_component(self, seq):
"""Create a new :py:class:`hl7.Component` compatible with this container"""
return self.factory.create_component(
- self.separators[4],
- seq,
+ sequence=seq,
esc=self.esc,
- separators=self.separators[4:],
+ separators=self.separators,
factory=self.factory,
)
+ def __getitem__(self, item):
+ # Python slice operator was returning a regular list, not a
+ # Container subclass
+ sequence = super(Container, self).__getitem__(item)
+ if isinstance(item, slice):
+ return self.__class__(
+ self.separator,
+ sequence,
+ self.esc,
+ self.separators,
+ factory=self.factory,
+ )
+ return sequence
+
+ def __getslice__(self, i, j):
+ # Python 2.x compatibility. __getslice__ is deprecated, and
+ # we want to wrap the logic from __getitem__ when handling slices
+ return self.__getitem__(slice(i, j))
+
+ def __str__(self):
+ return self.separator.join((str(x) for x in self))
-class File(Container, BuilderMixin):
+
+class File(Container):
"""Representation of an HL7 file from the batch protocol.
It contains a list of :py:class:`hl7.Batch`
instances. It may contain FHS/FTS :py:class:`hl7.Segment` instances.
@@ -167,10 +155,11 @@ class File(Container, BuilderMixin):
"""
def __init__(
- self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
):
+ assert not separator or separator == separators[0]
super(File, self).__init__(
- separator,
+ separator=separators[0],
sequence=sequence,
esc=esc,
separators=separators,
@@ -247,7 +236,7 @@ class File(Container, BuilderMixin):
)
-class Batch(Container, BuilderMixin):
+class Batch(Container):
"""Representation of an HL7 batch from the batch protocol.
It contains a list of :py:class:`hl7.Message` instances.
It may contain BHS/BTS :py:class:`hl7.Segment` instances.
@@ -257,10 +246,11 @@ class Batch(Container, BuilderMixin):
"""
def __init__(
- self, separator, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
):
+ assert not separator or separator == separators[0]
super(Batch, self).__init__(
- separator,
+ separator=separators[0],
sequence=sequence,
esc=esc,
separators=separators,
@@ -337,7 +327,19 @@ class Batch(Container, BuilderMixin):
)
-class Message(Container, BuilderMixin):
+class Message(Container):
+ def __init__(
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ assert not separator or separator == separators[0]
+ super(Message, self).__init__(
+ separator=separators[0],
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+
"""Representation of an HL7 message. It contains a list
of :py:class:`hl7.Segment` instances.
"""
@@ -379,7 +381,7 @@ class Message(Container, BuilderMixin):
If key is an integer, ``__setitem__`` acts list a list, setting
the :py:class:`hl7.Segment` held at that index:
- >>> h[1] = hl7.Segment("|", [hl7.Field("^", ['PID'], [''])])
+ >>> h[1] = hl7.Segment("|", [hl7.Field("~", ['PID'], [''])])
If the key is a string of length greater than 3,
the key is parsed into an :py:class:`hl7.Accessor` and passed
@@ -439,87 +441,35 @@ class Message(Container, BuilderMixin):
subcomponent_num=1,
):
"""
- Extract a field using a future proofed approach, based on rules in:
- http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
+ Extract a field using a future proofed approach, based on rules in:
+ http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
- 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
+ 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
- | PID.F3.R1.C2.S2 = 'Sub-Component2'
- | PID.F4.R2.C1 = 'Repeat1'
+ | PID.F3.R1.C2.S2 = 'Sub-Component2'
+ | PID.F4.R2.C1 = 'Repeat1'
- Compatibility Rules:
+ Compatibility Rules:
- If the parse tree is deeper than the specified path continue
- following the first child branch until a leaf of the tree is
- encountered and return that value (which could be blank).
+ If the parse tree is deeper than the specified path continue
+ following the first child branch until a leaf of the tree is
+ encountered and return that value (which could be blank).
- Example:
+ Example:
- | PID.F3.R1.C2 = 'Sub-Component1' (assume .SC1)
+ | PID.F3.R1.C2 = 'Sub-Component1' (assume .SC1)
- If the parse tree terminates before the full path is satisfied
- check each of the subsequent paths and if every one is specified
- at position 1 then the leaf value reached can be returned as the
- result.
+ If the parse tree terminates before the full path is satisfied
+ check each of the subsequent paths and if every one is specified
+ at position 1 then the leaf value reached can be returned as the
+ result.
- | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
+ | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
"""
- # Save original values for error messages
- accessor = Accessor(
- segment, segment_num, field_num, repeat_num, component_num, subcomponent_num
+ return self.segments(segment)(segment_num).extract_field(
+ segment_num, field_num, repeat_num, component_num, subcomponent_num
)
- field_num = field_num or 1
- repeat_num = repeat_num or 1
- component_num = component_num or 1
- subcomponent_num = subcomponent_num or 1
-
- segment = self.segments(segment)(segment_num)
- if field_num < len(segment):
- field = segment(field_num)
- else:
- if repeat_num == 1 and component_num == 1 and subcomponent_num == 1:
- return "" # Assume non-present optional value
- raise IndexError("Field not present: {0}".format(accessor.key))
-
- rep = field(repeat_num)
-
- if not isinstance(rep, Repetition):
- # leaf
- if component_num == 1 and subcomponent_num == 1:
- return (
- rep
- if accessor.segment == "MSH" and accessor.field_num in (1, 2)
- else self.unescape(rep)
- )
- raise IndexError(
- "Field reaches leaf node before completing path: {0}".format(
- accessor.key
- )
- )
-
- if component_num > len(rep):
- if subcomponent_num == 1:
- return "" # Assume non-present optional value
- raise IndexError("Component not present: {0}".format(accessor.key))
-
- component = rep(component_num)
- if not isinstance(component, Component):
- # leaf
- if subcomponent_num == 1:
- return self.unescape(component)
- raise IndexError(
- "Field reaches leaf node before completing path: {0}".format(
- accessor.key
- )
- )
-
- if subcomponent_num <= len(component):
- subcomponent = component(subcomponent_num)
- return self.unescape(subcomponent)
- else:
- return "" # Assume non-present optional value
-
def assign_field(
self,
value,
@@ -531,209 +481,66 @@ class Message(Container, BuilderMixin):
subcomponent_num=None,
):
"""
- Assign a value into a message using the tree based assignment notation.
- The segment must exist.
+ Assign a value into a message using the tree based assignment notation.
+ The segment must exist.
- Extract a field using a future proofed approach, based on rules in:
- http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
+ Extract a field using a future proofed approach, based on rules in:
+ http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
"""
- segment = self.segments(segment)(segment_num)
-
- while len(segment) <= field_num:
- segment.append(self.create_field([]))
- field = segment(field_num)
- if repeat_num is None:
- field[:] = [value]
- return
- while len(field) < repeat_num:
- field.append(self.create_repetition([]))
- repetition = field(repeat_num)
- if component_num is None:
- repetition[:] = [value]
- return
- while len(repetition) < component_num:
- repetition.append(self.create_component([]))
- component = repetition(component_num)
- if subcomponent_num is None:
- component[:] = [value]
- return
- while len(component) < subcomponent_num:
- component.append("")
- component(subcomponent_num, value)
+ self.segments(segment)(segment_num).assign_field(
+ value, field_num, repeat_num, component_num, subcomponent_num
+ )
def escape(self, field, app_map=None):
"""
- See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
+ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
- To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
+ To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
- Pass through the message. Replace recognised characters with their escaped
- version. Return an ascii encoded string.
+ Pass through the message. Replace recognised characters with their escaped
+ version. Return an ascii encoded string.
- Functionality:
+ Functionality:
- * Replace separator characters (2.10.4)
- * replace application defined characters (2.10.7)
- * Replace non-ascii values with hex versions using HL7 conventions.
+ * Replace separator characters (2.10.4)
+ * replace application defined characters (2.10.7)
+ * Replace non-ascii values with hex versions using HL7 conventions.
- Incomplete:
+ Incomplete:
- * replace highlight characters (2.10.3)
- * How to handle the rich text substitutions.
- * Merge contiguous hex values
+ * replace highlight characters (2.10.3)
+ * How to handle the rich text substitutions.
+ * Merge contiguous hex values
"""
- if not field:
- return field
-
- esc = str(self.esc)
-
- DEFAULT_MAP = {
- self.separators[1]: "F", # 2.10.4
- self.separators[2]: "R",
- self.separators[3]: "S",
- self.separators[4]: "T",
- self.esc: "E",
- "\r": ".br", # 2.10.6
- }
-
- rv = []
- for offset, c in enumerate(field):
- if app_map and c in app_map:
- rv.append(esc + app_map[c] + esc)
- elif c in DEFAULT_MAP:
- rv.append(esc + DEFAULT_MAP[c] + esc)
- elif ord(c) >= 0x20 and ord(c) <= 0x7E:
- rv.append(c)
- else:
- rv.append("%sX%2x%s" % (esc, ord(c), esc))
-
- return "".join(rv)
-
- def unescape(self, field, app_map=None): # noqa: C901
+ return escape(self, field, app_map)
+
+ def unescape(self, field, app_map=None):
"""
- See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
+ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
- To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
+ To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
- This will convert the identifiable sequences.
- If the application provides mapping, these are also used.
- Items which cannot be mapped are removed
+ This will convert the identifiable sequences.
+ If the application provides mapping, these are also used.
+ Items which cannot be mapped are removed
- For example, the App Map count provide N, H, Zxxx values
+ For example, the App Map count provide N, H, Zxxx values
- Chapter 2: Section 2.10
+ Chapter 2: Section 2.10
- At the moment, this functionality can:
+ At the moment, this functionality can:
- * replace the parsing characters (2.10.4)
- * replace highlight characters (2.10.3)
- * replace hex characters. (2.10.5)
- * replace rich text characters (2.10.6)
- * replace application defined characters (2.10.7)
+ * replace the parsing characters (2.10.4)
+ * replace highlight characters (2.10.3)
+ * replace hex characters. (2.10.5)
+ * replace rich text characters (2.10.6)
+ * replace application defined characters (2.10.7)
- It cannot:
+ It cannot:
- * switch code pages / ISO IR character sets
+ * switch code pages / ISO IR character sets
"""
- if not field or field.find(self.esc) == -1:
- return field
-
- DEFAULT_MAP = {
- "H": "_", # Override using the APP MAP: 2.10.3
- "N": "_", # Override using the APP MAP
- "F": self.separators[1], # 2.10.4
- "R": self.separators[2],
- "S": self.separators[3],
- "T": self.separators[4],
- "E": self.esc,
- ".br": "\r", # 2.10.6
- ".sp": "\r",
- ".fi": "",
- ".nf": "",
- ".in": " ",
- ".ti": " ",
- ".sk": " ",
- ".ce": "\r",
- }
-
- rv = []
- collecting = []
- in_seq = False
- for offset, c in enumerate(field):
- if in_seq:
- if c == self.esc:
- in_seq = False
- value = "".join(collecting)
- collecting = []
- if not value:
- logger.warn(
- "Error unescaping value [%s], empty sequence found at %d",
- field,
- offset,
- )
- continue
- if app_map and value in app_map:
- rv.append(app_map[value])
- elif value in DEFAULT_MAP:
- rv.append(DEFAULT_MAP[value])
- elif value.startswith(".") and (
- (app_map and value[:3] in app_map) or value[:3] in DEFAULT_MAP
- ):
- # Substitution with a number of repetitions defined (2.10.6)
- if app_map and value[:3] in app_map:
- ch = app_map[value[:3]]
- else:
- ch = DEFAULT_MAP[value[:3]]
- count = int(value[3:])
- rv.append(ch * count)
-
- elif (
- value[0] == "C"
- ): # Convert to new Single Byte character set : 2.10.2
- # Two HEX values, first value chooses the character set (ISO-IR), second gives the value
- logger.warn(
- "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
- value,
- field,
- offset,
- )
- elif (
- value[0] == "M"
- ): # Switch to new Multi Byte character set : 2.10.2
- # Three HEX values, first value chooses the character set (ISO-IR), rest give the value
- logger.warn(
- "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
- value,
- field,
- offset,
- )
- elif value[0] == "X": # Hex encoded Bytes: 2.10.5
- value = value[1:]
- try:
- for off in range(0, len(value), 2):
- rv.append(chr(int(value[off : off + 2], 16)))
- except Exception:
- logger.exception(
- "Error decoding hex value [%s], field [%s], offset [%s]",
- value,
- field,
- offset,
- )
- else:
- logger.exception(
- "Error decoding value [%s], field [%s], offset [%s]",
- value,
- field,
- offset,
- )
- else:
- collecting.append(c)
- elif c == self.esc:
- in_seq = True
- else:
- rv.append(str(c))
-
- return "".join(rv)
+ return unescape(self, field, app_map)
def create_ack(
self, ack_code="AA", message_id=None, application=None, facility=None
@@ -753,45 +560,37 @@ class Message(Container, BuilderMixin):
"""
source_msh = self.segment("MSH")
msh = self.create_segment([self.create_field(["MSH"])])
- msa = self.create_segment([self.create_field(["MSA"])])
- ack = self.create_message([msh, msa])
- ack.assign_field(str(source_msh(1)), "MSH", 1, 1)
- ack.assign_field(str(source_msh(2)), "MSH", 1, 2)
+ msh.assign_field(str(source_msh(1)), 1)
+ msh.assign_field(str(source_msh(2)), 2)
# Sending application is source receving application
- ack.assign_field(
- str(application) if application is not None else str(source_msh(5)),
- "MSH",
- 1,
- 3,
+ msh.assign_field(
+ str(application) if application is not None else str(source_msh(5)), 3
)
# Sending facility is source receving facility
- ack.assign_field(
- str(facility) if facility is not None else str(source_msh(6)), "MSH", 1, 4
+ msh.assign_field(
+ str(facility) if facility is not None else str(source_msh(6)), 4
)
# Receiving application is source sending application
- ack.assign_field(str(source_msh(3)), "MSH", 1, 5)
+ msh.assign_field(str(source_msh(3)), 5)
# Receiving facility is source sending facility
- ack.assign_field(str(source_msh(4)), "MSH", 1, 6)
- ack.assign_field(
- str(datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")), "MSH", 1, 7
- )
+ msh.assign_field(str(source_msh(4)), 6)
+ msh.assign_field(str(datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")), 7)
# Message type code
- ack.assign_field("ACK", "MSH", 1, 9, 1, 1)
+ msh.assign_field("ACK", 9, 1, 1)
# Copy trigger event from source
- ack.assign_field(str(source_msh(9)(1)(2)), "MSH", 1, 9, 1, 2)
- ack.assign_field("ACK", "MSH", 1, 9, 1, 3)
- ack.assign_field(
- message_id if message_id is not None else generate_message_control_id(),
- "MSH",
- 1,
- 10,
+ msh.assign_field(str(source_msh(9)(1)(2)), 9, 1, 2)
+ msh.assign_field("ACK", 9, 1, 3)
+ msh.assign_field(
+ message_id if message_id is not None else generate_message_control_id(), 10
)
- ack.assign_field(str(source_msh(11)), "MSH", 1, 11)
- ack.assign_field(str(source_msh(12)), "MSH", 1, 12)
+ msh.assign_field(str(source_msh(11)), 11)
+ msh.assign_field(str(source_msh(12)), 12)
- ack.assign_field(str(ack_code), "MSA", 1, 1)
- ack.assign_field(str(source_msh(10)), "MSA", 1, 2)
+ msa = self.create_segment([self.create_field(["MSA"])])
+ msa.assign_field(str(ack_code), 1)
+ msa.assign_field(str(source_msh(10)), 2)
+ ack = self.create_message([msh, msa])
return ack
@@ -812,12 +611,156 @@ class Message(Container, BuilderMixin):
class Segment(Container):
+ def __init__(
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ assert not separator or separator == separators[1]
+ super(Segment, self).__init__(
+ separator=separators[1],
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+
"""Second level of an HL7 message, which represents an HL7 Segment.
Traditionally this is a line of a message that ends with a carriage
return and is separated by pipes. It contains a list of
:py:class:`hl7.Field` instances.
"""
+ def extract_field(
+ self,
+ segment_num=1,
+ field_num=1,
+ repeat_num=1,
+ component_num=1,
+ subcomponent_num=1,
+ ):
+ """
+ Extract a field using a future proofed approach, based on rules in:
+ http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
+
+ 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
+
+ | F3.R1.C2.S2 = 'Sub-Component2'
+ | F4.R2.C1 = 'Repeat1'
+
+ Compatibility Rules:
+
+ If the parse tree is deeper than the specified path continue
+ following the first child branch until a leaf of the tree is
+ encountered and return that value (which could be blank).
+
+ Example:
+
+ | F3.R1.C2 = 'Sub-Component1' (assume .SC1)
+
+ If the parse tree terminates before the full path is satisfied
+ check each of the subsequent paths and if every one is specified
+ at position 1 then the leaf value reached can be returned as the
+ result.
+
+ | F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
+ """
+ # Save original values for error messages
+ accessor = Accessor(
+ self[0][0],
+ segment_num,
+ field_num,
+ repeat_num,
+ component_num,
+ subcomponent_num,
+ )
+
+ field_num = field_num or 1
+ repeat_num = repeat_num or 1
+ component_num = component_num or 1
+ subcomponent_num = subcomponent_num or 1
+
+ if field_num < len(self):
+ field = self(field_num)
+ else:
+ if repeat_num == 1 and component_num == 1 and subcomponent_num == 1:
+ return "" # Assume non-present optional value
+ raise IndexError("Field not present: {0}".format(accessor.key))
+
+ rep = field(repeat_num)
+
+ if not isinstance(rep, Repetition):
+ # leaf
+ if component_num == 1 and subcomponent_num == 1:
+ return (
+ rep
+ if accessor.segment == "MSH" and accessor.field_num in (1, 2)
+ else unescape(self, rep)
+ )
+ raise IndexError(
+ "Field reaches leaf node before completing path: {0}".format(
+ accessor.key
+ )
+ )
+
+ if component_num > len(rep):
+ if subcomponent_num == 1:
+ return "" # Assume non-present optional value
+ raise IndexError("Component not present: {0}".format(accessor.key))
+
+ component = rep(component_num)
+ if not isinstance(component, Component):
+ # leaf
+ if subcomponent_num == 1:
+ return unescape(self, component)
+ raise IndexError(
+ "Field reaches leaf node before completing path: {0}".format(
+ accessor.key
+ )
+ )
+
+ if subcomponent_num <= len(component):
+ subcomponent = component(subcomponent_num)
+ return unescape(self, subcomponent)
+ else:
+ return "" # Assume non-present optional value
+
+ def assign_field(
+ self,
+ value,
+ field_num=None,
+ repeat_num=None,
+ component_num=None,
+ subcomponent_num=None,
+ ):
+ """
+ Assign a value into a message using the tree based assignment notation.
+ The segment must exist.
+
+ Extract a field using a future proofed approach, based on rules in:
+ http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
+ """
+
+ while len(self) <= field_num:
+ self.append(self.create_field([]))
+ field = self(field_num)
+ if repeat_num is None:
+ field[:] = [value]
+ return
+ while len(field) < repeat_num:
+ field.append(self.create_repetition([]))
+ repetition = field(repeat_num)
+ if component_num is None:
+ repetition[:] = [value]
+ return
+ while len(repetition) < component_num:
+ repetition.append(self.create_component([]))
+ component = repetition(component_num)
+ if subcomponent_num is None:
+ component[:] = [value]
+ return
+ while len(component) < subcomponent_num:
+ component.append("")
+ component(subcomponent_num, value)
+
def _adjust_index(self, index):
# First element is the segment name, so we don't need to adjust to get 1-based
return index
@@ -835,6 +778,18 @@ class Segment(Container):
class Field(Container):
+ def __init__(
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ assert not separator or separator == separators[2]
+ super(Field, self).__init__(
+ separator=separators[2],
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+
"""Third level of an HL7 message, that traditionally is surrounded
by pipes and separated by carets. It contains a list of strings
or :py:class:`hl7.Repetition` instances.
@@ -842,12 +797,36 @@ class Field(Container):
class Repetition(Container):
+ def __init__(
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ assert not separator or separator == separators[3]
+ super(Repetition, self).__init__(
+ separator=separators[3],
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+
"""Fourth level of an HL7 message. A field can repeat.
It contains a list of strings or :py:class:`hl7.Component` instances.
"""
class Component(Container):
+ def __init__(
+ self, separator=None, sequence=[], esc="\\", separators="\r|~^&", factory=None
+ ):
+ assert not separator or separator == separators[4]
+ super(Component, self).__init__(
+ separator=separators[4],
+ sequence=sequence,
+ esc=esc,
+ separators=separators,
+ factory=factory,
+ )
+
"""Fifth level of an HL7 message. A component is a composite datatypes.
It contains a list of string sub-components.
"""
=====================================
hl7/mllp/streams.py
=====================================
@@ -201,7 +201,8 @@ class HL7StreamProtocol(StreamReaderProtocol):
self._encoding_errors = encoding_errors
def connection_made(self, transport):
- if self._reject_connection:
+ # _reject_connection not added until 3.8
+ if getattr(self, "_reject_connection", False):
context = {
"message": (
"An open stream was garbage collected prior to "
@@ -308,6 +309,5 @@ class HL7StreamWriter(MLLPStreamWriter):
self._encoding_errors = encoding_errors or "strict"
def writemessage(self, message):
- """Writes an :py:class:`hl7.Message` to the stream.
- """
+ """Writes an :py:class:`hl7.Message` to the stream."""
self.writeblock(str(message).encode(self.encoding, self.encoding_errors))
=====================================
hl7/parser.py
=====================================
@@ -96,15 +96,13 @@ def parse(lines, encoding="utf-8", factory=Factory):
strmsg = lines.strip()
# The method for parsing the message
plan = create_parse_plan(strmsg, factory)
- # Start spliting the methods based upon the ParsePlan
+ # Start splitting the methods based upon the ParsePlan
return _split(strmsg, plan)
def _create_batch(batch, messages, encoding, factory):
- """Creates a :py:class:`hl7.Batch`
- """
+ """Creates a :py:class:`hl7.Batch`"""
kwargs = {
- "separator": "\r",
"sequence": [
parse(message, encoding=encoding, factory=factory) for message in messages
],
@@ -189,7 +187,6 @@ def parse_batch(lines, encoding="utf-8", factory=Factory):
def _create_file(file, batches, encoding, factory):
kwargs = {
- "separator": "\r",
"sequence": [
_create_batch(batch[0], batch[1], encoding, factory) for batch in batches
],
@@ -250,7 +247,7 @@ def parse_file(lines, encoding="utf-8", factory=Factory): # noqa: C901
batches = []
messages = []
in_batch = False
- # Split the file into lines, reatining the ends
+ # Split the file into lines, retaining the ends
for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
# strip out all whitespace MINUS the '\r'
line = line.strip(_HL7_WHITESPACE)
@@ -319,9 +316,15 @@ def _split(text, plan):
seps = text[4:sep_end_off]
text = text[sep_end_off + 1 :]
data = [
- plan.factory.create_field("", [seg]),
- plan.factory.create_field("", [sep0]),
- plan.factory.create_field(sep0, [seps]),
+ plan.factory.create_field(
+ sequence=[seg], esc=plan.esc, separators=plan.separators
+ ),
+ plan.factory.create_field(
+ sequence=[sep0], esc=plan.esc, separators=plan.separators
+ ),
+ plan.factory.create_field(
+ sequence=[seps], esc=plan.esc, separators=plan.separators
+ ),
]
else:
data = []
@@ -338,7 +341,7 @@ def create_parse_plan(strmsg, factory=Factory):
the details stored within the message.
"""
# We will always use a carriage return to separate segments
- separators = ["\r"]
+ separators = "\r"
# Extract the rest of the separators. Defaults used if not present.
if strmsg[:3] not in ("MSH", "FHS", "BHS"):
@@ -348,19 +351,19 @@ def create_parse_plan(strmsg, factory=Factory):
sep0 = strmsg[3]
seps = list(strmsg[3 : strmsg.find(sep0, 4)])
- separators.append(seps[0])
+ separators += seps[0]
if len(seps) > 2:
- separators.append(seps[2]) # repetition separator
+ separators += seps[2] # repetition separator
else:
- separators.append("~") # repetition separator
+ separators += "~" # repetition separator
if len(seps) > 1:
- separators.append(seps[1]) # component separator
+ separators += seps[1] # component separator
else:
- separators.append("^") # component separator
+ separators += "^" # component separator
if len(seps) > 4:
- separators.append(seps[4]) # sub-component separator
+ separators += seps[4] # sub-component separator
else:
- separators.append("&") # sub-component separator
+ separators += "&" # sub-component separator
if len(seps) > 3:
esc = seps[3]
else:
@@ -374,7 +377,7 @@ def create_parse_plan(strmsg, factory=Factory):
factory.create_repetition,
factory.create_component,
]
- return _ParsePlan(separators, containers, esc, factory)
+ return _ParsePlan(separators[0], separators, containers, esc, factory)
class _ParsePlan(object):
@@ -384,27 +387,26 @@ class _ParsePlan(object):
# field, component, repetition, escape, subcomponent
- def __init__(self, separators, containers, esc, factory):
+ def __init__(self, seperator, separators, containers, esc, factory):
# TODO test to see performance implications of the assertion
# since we generate the ParsePlan, this should never be in
# invalid state
- assert len(containers) == len(separators)
+ assert len(containers) == len(separators[separators.find(seperator) :])
+ self.separator = seperator
self.separators = separators
self.containers = containers
self.esc = esc
self.factory = factory
- @property
- def separator(self):
- """Return the current separator to use based on the plan."""
- return self.separators[0]
-
def container(self, data):
- """Return an instance of the approriate container for the *data*
+ """Return an instance of the appropriate container for the *data*
as specified by the current plan.
"""
return self.containers[0](
- self.separator, data, self.esc, self.separators, self.factory
+ sequence=data,
+ esc=self.esc,
+ separators=self.separators,
+ factory=self.factory,
)
def next(self):
@@ -417,7 +419,11 @@ class _ParsePlan(object):
# the separators and containers lists. Use self.__class__()
# in case :class:`hl7.ParsePlan` is subclassed
return self.__class__(
- self.separators[1:], self.containers[1:], self.esc, self.factory
+ self.separators[self.separators.find(self.separator) + 1],
+ self.separators,
+ self.containers[1:],
+ self.esc,
+ self.factory,
)
# When we have no separators and containers left, return None,
# which indicates that we have nothing further.
@@ -425,7 +431,7 @@ class _ParsePlan(object):
def applies(self, text):
"""return True if the separator or those if the children are in the text"""
- for s in self.separators:
+ for s in self.separators[self.separators.find(self.separator) :]:
if text.find(s) >= 0:
return True
return False
=====================================
hl7/util.py
=====================================
@@ -15,15 +15,20 @@ def ishl7(line):
:rtype: bool
"""
# Prevent issues if the line is empty
- return line and line.strip()[:3] == "MSH" and line.count("MSH") == 1
+ if not line:
+ return False
+ msh = line.strip()[:4]
+ if len(msh) != 4:
+ return False
+ return msh[:3] == "MSH" and line.count("\rMSH" + msh[3]) == 0
def isbatch(line):
"""
- Batches are wrapped in BHS / BTS or have more than one
- message
- BHS = batch header segment
- BTS = batch trailer segment
+ Batches are wrapped in BHS / BTS or have more than one
+ message
+ BHS = batch header segment
+ BTS = batch trailer segment
"""
return line and (
line.strip()[:3] == "BHS"
@@ -33,18 +38,18 @@ def isbatch(line):
def isfile(line):
"""
- Files are wrapped in FHS / FTS, or may be a batch
- FHS = file header segment
- FTS = file trailer segment
+ Files are wrapped in FHS / FTS, or may be a batch
+ FHS = file header segment
+ FTS = file trailer segment
"""
return line and (line.strip()[:3] == "FHS" or isbatch(line))
def split_file(hl7file):
"""
- Given a file, split out the messages.
- Does not do any validation on the message.
- Throws away batch and file segments.
+ Given a file, split out the messages.
+ Does not do any validation on the message.
+ Throws away batch and file segments.
"""
rv = []
for line in hl7file.split("\r"):
@@ -81,3 +86,177 @@ def generate_message_control_id():
# Add 4 chars of uniqueness
unique = "".join(random.sample(alphanumerics, 4))
return timestamp + unique
+
+
+def escape(container, field, app_map=None):
+ """
+ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
+
+ To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
+
+ Pass through the message. Replace recognised characters with their escaped
+ version. Return an ascii encoded string.
+
+ Functionality:
+
+ * Replace separator characters (2.10.4)
+ * replace application defined characters (2.10.7)
+ * Replace non-ascii values with hex versions using HL7 conventions.
+
+ Incomplete:
+
+ * replace highlight characters (2.10.3)
+ * How to handle the rich text substitutions.
+ * Merge contiguous hex values
+ """
+ if not field:
+ return field
+
+ esc = str(container.esc)
+
+ DEFAULT_MAP = {
+ container.separators[1]: "F", # 2.10.4
+ container.separators[2]: "R",
+ container.separators[3]: "S",
+ container.separators[4]: "T",
+ container.esc: "E",
+ "\r": ".br", # 2.10.6
+ }
+
+ rv = []
+ for offset, c in enumerate(field):
+ if app_map and c in app_map:
+ rv.append(esc + app_map[c] + esc)
+ elif c in DEFAULT_MAP:
+ rv.append(esc + DEFAULT_MAP[c] + esc)
+ elif ord(c) >= 0x20 and ord(c) <= 0x7E:
+ rv.append(c)
+ else:
+ rv.append("%sX%2x%s" % (esc, ord(c), esc))
+
+ return "".join(rv)
+
+
+def unescape(container, field, app_map=None): # noqa: C901
+ """
+ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
+
+ To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
+
+ This will convert the identifiable sequences.
+ If the application provides mapping, these are also used.
+ Items which cannot be mapped are removed
+
+ For example, the App Map count provide N, H, Zxxx values
+
+ Chapter 2: Section 2.10
+
+ At the moment, this functionality can:
+
+ * replace the parsing characters (2.10.4)
+ * replace highlight characters (2.10.3)
+ * replace hex characters. (2.10.5)
+ * replace rich text characters (2.10.6)
+ * replace application defined characters (2.10.7)
+
+ It cannot:
+
+ * switch code pages / ISO IR character sets
+ """
+ if not field or field.find(container.esc) == -1:
+ return field
+
+ DEFAULT_MAP = {
+ "H": "_", # Override using the APP MAP: 2.10.3
+ "N": "_", # Override using the APP MAP
+ "F": container.separators[1], # 2.10.4
+ "R": container.separators[2],
+ "S": container.separators[3],
+ "T": container.separators[4],
+ "E": container.esc,
+ ".br": "\r", # 2.10.6
+ ".sp": "\r",
+ ".fi": "",
+ ".nf": "",
+ ".in": " ",
+ ".ti": " ",
+ ".sk": " ",
+ ".ce": "\r",
+ }
+
+ rv = []
+ collecting = []
+ in_seq = False
+ for offset, c in enumerate(field):
+ if in_seq:
+ if c == container.esc:
+ in_seq = False
+ value = "".join(collecting)
+ collecting = []
+ if not value:
+ logger.warn(
+ "Error unescaping value [%s], empty sequence found at %d",
+ field,
+ offset,
+ )
+ continue
+ if app_map and value in app_map:
+ rv.append(app_map[value])
+ elif value in DEFAULT_MAP:
+ rv.append(DEFAULT_MAP[value])
+ elif value.startswith(".") and (
+ (app_map and value[:3] in app_map) or value[:3] in DEFAULT_MAP
+ ):
+ # Substitution with a number of repetitions defined (2.10.6)
+ if app_map and value[:3] in app_map:
+ ch = app_map[value[:3]]
+ else:
+ ch = DEFAULT_MAP[value[:3]]
+ count = int(value[3:])
+ rv.append(ch * count)
+
+ elif (
+ value[0] == "C"
+ ): # Convert to new Single Byte character set : 2.10.2
+ # Two HEX values, first value chooses the character set (ISO-IR), second gives the value
+ logger.warn(
+ "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
+ value,
+ field,
+ offset,
+ )
+ elif value[0] == "M": # Switch to new Multi Byte character set : 2.10.2
+ # Three HEX values, first value chooses the character set (ISO-IR), rest give the value
+ logger.warn(
+ "Error inline character sets [%s] not implemented, field [%s], offset [%s]",
+ value,
+ field,
+ offset,
+ )
+ elif value[0] == "X": # Hex encoded Bytes: 2.10.5
+ value = value[1:]
+ try:
+ for off in range(0, len(value), 2):
+ rv.append(chr(int(value[off : off + 2], 16)))
+ except Exception:
+ logger.exception(
+ "Error decoding hex value [%s], field [%s], offset [%s]",
+ value,
+ field,
+ offset,
+ )
+ else:
+ logger.exception(
+ "Error decoding value [%s], field [%s], offset [%s]",
+ value,
+ field,
+ offset,
+ )
+ else:
+ collecting.append(c)
+ elif c == container.esc:
+ in_seq = True
+ else:
+ rv.append(str(c))
+
+ return "".join(rv)
=====================================
hl7/version.py
=====================================
@@ -6,7 +6,7 @@ Primary version number source.
Forth element can be 'dev' < 'a' < 'b' < 'rc' < 'final'. An empty 4th
element is equivalent to 'final'.
"""
-VERSION = (0, 4, 2, "final")
+VERSION = (0, 4, 5, "final")
def get_version():
=====================================
requirements.txt
=====================================
@@ -1,7 +1,9 @@
# pip Requirements for developing python-hl7 (not required to use as a library)
-tox==3.14.4
-flake8==3.8.3
-Sphinx==2.4.1
-coverage==5.0.3
-isort==4.3.21
-black==19.10b0; python_version > "3.5"
+tox==3.24.5
+flake8==4.0.1
+Sphinx==4.4.0
+coverage==6.3.2
+isort==5.10.1
+black==22.3.0
+twine==3.8.0
+wheel==0.37.1
=====================================
setup.py
=====================================
@@ -21,6 +21,9 @@ setup(
author="John Paulett",
author_email="john at paulett.org",
url="http://python-hl7.readthedocs.org",
+ project_urls={
+ "Source": "https://github.com/johnpaulett/python-hl7",
+ },
license="BSD",
platforms=["POSIX", "Windows"],
keywords=[
@@ -47,6 +50,10 @@ setup(
install_requires=[],
test_suite="tests",
tests_require=[],
- entry_points={"console_scripts": ["mllp_send=hl7.client:mllp_send",],},
+ entry_points={
+ "console_scripts": [
+ "mllp_send=hl7.client:mllp_send",
+ ],
+ },
zip_safe=True,
)
=====================================
tests/backports/unittest/async_case.py
=====================================
@@ -62,7 +62,7 @@ class IsolatedAsyncioTestCase(TestCase):
# We intentionally don't add inspect.iscoroutinefunction() check
# for func argument because there is no way
# to check for async function reliably:
- # 1. It can be "async def func()" iself
+ # 1. It can be "async def func()" itself
# 2. Class can implement "async def __call__()" method
# 3. Regular "def func()" that returns awaitable object
self.addCleanup(*(func, *args), **kwargs)
=====================================
tests/backports/unittest/case.py
=====================================
@@ -439,7 +439,7 @@ class TestCase(object):
# If a string is longer than _diffThreshold, use normal comparison instead
# of difflib. See #11763.
- _diffThreshold = 2 ** 16
+ _diffThreshold = 2**16
# Attribute used by TestSuite for classSetUp
@@ -449,8 +449,8 @@ class TestCase(object):
def __init__(self, methodName="runTest"):
"""Create an instance of the class that will use the named test
- method when executed. Raises a ValueError if the instance does
- not have a method with the specified name.
+ method when executed. Raises a ValueError if the instance does
+ not have a method with the specified name.
"""
self._testMethodName = methodName
self._outcome = None
@@ -829,29 +829,29 @@ class TestCase(object):
def assertRaises(self, expected_exception, *args, **kwargs):
"""Fail unless an exception of class expected_exception is raised
- by the callable when invoked with specified positional and
- keyword arguments. If a different type of exception is
- raised, it will not be caught, and the test case will be
- deemed to have suffered an error, exactly as for an
- unexpected exception.
+ by the callable when invoked with specified positional and
+ keyword arguments. If a different type of exception is
+ raised, it will not be caught, and the test case will be
+ deemed to have suffered an error, exactly as for an
+ unexpected exception.
- If called with the callable and arguments omitted, will return a
- context object used like this::
+ If called with the callable and arguments omitted, will return a
+ context object used like this::
- with self.assertRaises(SomeException):
- do_something()
+ with self.assertRaises(SomeException):
+ do_something()
- An optional keyword argument 'msg' can be provided when assertRaises
- is used as a context object.
+ An optional keyword argument 'msg' can be provided when assertRaises
+ is used as a context object.
- The context manager keeps a reference to the exception as
- the 'exception' attribute. This allows you to inspect the
- exception after the assertion::
+ The context manager keeps a reference to the exception as
+ the 'exception' attribute. This allows you to inspect the
+ exception after the assertion::
- with self.assertRaises(SomeException) as cm:
- do_something()
- the_exception = cm.exception
- self.assertEqual(the_exception.error_code, 3)
+ with self.assertRaises(SomeException) as cm:
+ do_something()
+ the_exception = cm.exception
+ self.assertEqual(the_exception.error_code, 3)
"""
context = _AssertRaisesContext(expected_exception, self)
try:
@@ -862,31 +862,31 @@ class TestCase(object):
def assertWarns(self, expected_warning, *args, **kwargs):
"""Fail unless a warning of class warnClass is triggered
- by the callable when invoked with specified positional and
- keyword arguments. If a different type of warning is
- triggered, it will not be handled: depending on the other
- warning filtering rules in effect, it might be silenced, printed
- out, or raised as an exception.
-
- If called with the callable and arguments omitted, will return a
- context object used like this::
-
- with self.assertWarns(SomeWarning):
- do_something()
-
- An optional keyword argument 'msg' can be provided when assertWarns
- is used as a context object.
-
- The context manager keeps a reference to the first matching
- warning as the 'warning' attribute; similarly, the 'filename'
- and 'lineno' attributes give you information about the line
- of Python code from which the warning was triggered.
- This allows you to inspect the warning after the assertion::
-
- with self.assertWarns(SomeWarning) as cm:
- do_something()
- the_warning = cm.warning
- self.assertEqual(the_warning.some_attribute, 147)
+ by the callable when invoked with specified positional and
+ keyword arguments. If a different type of warning is
+ triggered, it will not be handled: depending on the other
+ warning filtering rules in effect, it might be silenced, printed
+ out, or raised as an exception.
+
+ If called with the callable and arguments omitted, will return a
+ context object used like this::
+
+ with self.assertWarns(SomeWarning):
+ do_something()
+
+ An optional keyword argument 'msg' can be provided when assertWarns
+ is used as a context object.
+
+ The context manager keeps a reference to the first matching
+ warning as the 'warning' attribute; similarly, the 'filename'
+ and 'lineno' attributes give you information about the line
+ of Python code from which the warning was triggered.
+ This allows you to inspect the warning after the assertion::
+
+ with self.assertWarns(SomeWarning) as cm:
+ do_something()
+ the_warning = cm.warning
+ self.assertEqual(the_warning.some_attribute, 147)
"""
context = _AssertWarnsContext(expected_warning, self)
return context.handle("assertWarns", args, kwargs)
@@ -948,14 +948,14 @@ class TestCase(object):
def assertEqual(self, first, second, msg=None):
"""Fail if the two objects are unequal as determined by the '=='
- operator.
+ operator.
"""
assertion_func = self._getAssertEqualityFunc(first, second)
assertion_func(first, second, msg=msg)
def assertNotEqual(self, first, second, msg=None):
"""Fail if the two objects are equal as determined by the '!='
- operator.
+ operator.
"""
if not first != second:
msg = self._formatMessage(
@@ -965,16 +965,16 @@ class TestCase(object):
def assertAlmostEqual(self, first, second, places=None, msg=None, delta=None):
"""Fail if the two objects are unequal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero, or by comparing that the
- difference between the two objects is more than the given
- delta.
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero, or by comparing that the
+ difference between the two objects is more than the given
+ delta.
- Note that decimal places (from zero) are usually not the same
- as significant digits (measured from the most significant digit).
+ Note that decimal places (from zero) are usually not the same
+ as significant digits (measured from the most significant digit).
- If the two objects compare equal then they will automatically
- compare almost equal.
+ If the two objects compare equal then they will automatically
+ compare almost equal.
"""
if first == second:
# shortcut
@@ -1011,14 +1011,14 @@ class TestCase(object):
def assertNotAlmostEqual(self, first, second, places=None, msg=None, delta=None):
"""Fail if the two objects are equal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero, or by comparing that the
- difference between the two objects is less than the given delta.
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero, or by comparing that the
+ difference between the two objects is less than the given delta.
- Note that decimal places (from zero) are usually not the same
- as significant digits (measured from the most significant digit).
+ Note that decimal places (from zero) are usually not the same
+ as significant digits (measured from the most significant digit).
- Objects that are equal automatically fail.
+ Objects that are equal automatically fail.
"""
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
=====================================
tests/samples.py
=====================================
@@ -317,3 +317,18 @@ sample_bad_file3 = "\r".join(
"",
]
)
+
+sample_msh = "\r".join(
+ [
+ "MSH|^~\\&|HNAM_PM|HNA500|AIG||20131017140041||ADT^A01|Q150084616T145947960|P|2.3",
+ "PID|1|2148790^^^MSH_MRN^MR|2148790^^^MSH_MRN^MR~162840^^^MSH_EMPI^CM|3722^0^^MSH_DTC^REFE~184737^0^^IID^DONOR ~Q2147670^0^^MSQ_MRN|RUFUSS^MELLODIAL^^^^^CURRENT||19521129|F|RUFUSS^MELLODIAL^^^^^PREVIOUS|OT|221 CANVIEW AVENUE^66-D^BRONX^NY^10454^USA^HOME^^058||3472444150^HOME~(000)000-0000^ALTERNATE||ENGLISH|M|PEN|O75622322^^^MSH_FIN_NBR^FIN NB|125544697|||HIS|||0",
+ 'PV1|0001|I|MBN1^MBN1^06|4| 863968||03525^FARP^YONAN|03525^FARP^YONAN|""|NUR|||N|5|| U|03525^FARP^YONAN|I|01|T22~SLF|||||||||||||||||||E||AC|||20140210225300|""',
+ 'DG1|0001|I9|440.21^ATHEROSCLEROSIS W/INT CLAUDCTN^I9|ATHEROSCLEROSIS W/INT CLAUDCTN|""|A|||||||.00||9',
+ 'IN1|0001|A10A|A10|HIP COMP MCAID|PO BOX 223^""^NEW YORK^NY^10116^US^^^""|HIP ON LINE|""|""|""|||""|""|25892261^""^""|C|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|Y|""||||||Y||""|||||||-JNJ45517',
+ 'IN2||062420044|""|||""|||||||||||||||||||60094|""|||||||||||||||||||||||||||||||',
+ 'IN1|0002|GMED|""|MEDICAID|""|""|""|""|""|||""|""||X|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|""|""||||||""||||||',
+ 'IN2||062420044|""|||""|||||||||||||||||||""|""||||||||||||||||||||||||||||||||""',
+ 'IN1|0003|SLFJ|""|SELF-PAY|""|""|""|""|""|||""|""||P|BENNETT^NELLY|4^SELF|10981226|322-10 GOODLIN AVE^APT B31^FLUSHING^NY^11355^US^^^61|""|""||||||""||||||',
+ "",
+ ]
+)
=====================================
tests/test_accessor.py
=====================================
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from unittest import TestCase
-from hl7 import Accessor
+from hl7 import Accessor, Field, Message, Segment
class AccessorTest(TestCase):
@@ -20,3 +20,24 @@ class AccessorTest(TestCase):
def test_equality(self):
self.assertEqual(Accessor("FOO", 1, 3, 4), Accessor("FOO", 1, 3, 4))
self.assertNotEqual(Accessor("FOO", 1), Accessor("FOO", 2))
+
+ def test_string(self):
+ SEP = "|^~\\&"
+ CR_SEP = "\r"
+ MSH = Segment(SEP[0], [Field(SEP[2], ["MSH"])])
+ MSA = Segment(SEP[0], [Field(SEP[2], ["MSA"])])
+ response = Message(CR_SEP, [MSH, MSA])
+ response["MSH.F1.R1"] = SEP[0]
+ response["MSH.F2.R1"] = SEP[1:]
+ self.assertEqual(str(response), "MSH|^~\\&|\rMSA\r")
+
+ response["MSH.F9.R1.C1"] = "ORU"
+ response["MSH.F9.R1.C2"] = "R01"
+ response["MSH.F9.R1.C3"] = ""
+ response["MSH.F12.R1"] = "2.4"
+ response["MSA.F1.R1"] = "AA"
+ response["MSA.F3.R1"] = "Application Message"
+ self.assertEqual(
+ str(response),
+ "MSH|^~\\&|||||||ORU^R01^|||2.4\rMSA|AA||Application Message\r",
+ )
=====================================
tests/test_client.py
=====================================
@@ -43,7 +43,7 @@ class MLLPClientTest(TestCase):
def test_send_message_unicode(self):
self.client.socket.recv.return_value = "thanks"
- result = self.client.send_message(u"foobar")
+ result = self.client.send_message("foobar")
self.assertEqual(result, "thanks")
self.client.socket.send.assert_called_once_with(b"\x0bfoobar\x1c\x0d")
=====================================
tests/test_construction.py
=====================================
@@ -12,8 +12,8 @@ CR_SEP = "\r"
class ConstructionTest(TestCase):
def test_create_msg(self):
# Create a message
- MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
- MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
+ MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
+ MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
response = hl7.Message(CR_SEP, [MSH, MSA])
response["MSH.F1.R1"] = SEP[0]
response["MSH.F2.R1"] = SEP[1:]
@@ -21,18 +21,18 @@ class ConstructionTest(TestCase):
def test_append(self):
# Append a segment to a message
- MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
+ MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
response = hl7.Message(CR_SEP, [MSH])
response["MSH.F1.R1"] = SEP[0]
response["MSH.F2.R1"] = SEP[1:]
- MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
+ MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
response.append(MSA)
self.assertEqual(str(response), "MSH|^~\\&|\rMSA\r")
def test_append_from_source(self):
# Copy a segment between messages
- MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSH"])])
- MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[1], ["MSA"])])
+ MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])])
+ MSA = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSA"])])
response = hl7.Message(CR_SEP, [MSH, MSA])
response["MSH.F1.R1"] = SEP[0]
response["MSH.F2.R1"] = SEP[1:]
=====================================
tests/test_parse.py
=====================================
@@ -413,7 +413,7 @@ class ParsePlanTest(TestCase):
def test_create_parse_plan(self):
plan = hl7.parser.create_parse_plan(sample_hl7)
- self.assertEqual(plan.separators, ["\r", "|", "~", "^", "&"])
+ self.assertEqual(plan.separators, "\r|~^&")
self.assertEqual(
plan.containers, [Message, Segment, Field, Repetition, Component]
)
@@ -431,19 +431,23 @@ class ParsePlanTest(TestCase):
plan = hl7.parser.create_parse_plan(sample_hl7)
n1 = plan.next()
- self.assertEqual(n1.separators, ["|", "~", "^", "&"])
+ self.assertEqual(n1.separators, "\r|~^&")
+ self.assertEqual(n1.separator, "|")
self.assertEqual(n1.containers, [Segment, Field, Repetition, Component])
n2 = n1.next()
- self.assertEqual(n2.separators, ["~", "^", "&"])
+ self.assertEqual(n2.separators, "\r|~^&")
+ self.assertEqual(n2.separator, "~")
self.assertEqual(n2.containers, [Field, Repetition, Component])
n3 = n2.next()
- self.assertEqual(n3.separators, ["^", "&"])
+ self.assertEqual(n3.separators, "\r|~^&")
+ self.assertEqual(n3.separator, "^")
self.assertEqual(n3.containers, [Repetition, Component])
n4 = n3.next()
- self.assertEqual(n4.separators, ["&"])
+ self.assertEqual(n4.separators, "\r|~^&")
+ self.assertEqual(n4.separator, "&")
self.assertEqual(n4.containers, [Component])
n5 = n4.next()
=====================================
tests/test_util.py
=====================================
@@ -11,6 +11,7 @@ from .samples import (
sample_file1,
sample_file2,
sample_hl7,
+ sample_msh,
)
@@ -23,6 +24,7 @@ class IsHL7Test(TestCase):
self.assertFalse(hl7.ishl7(sample_file))
self.assertFalse(hl7.ishl7(sample_file1))
self.assertFalse(hl7.ishl7(sample_file2))
+ self.assertTrue(hl7.ishl7(sample_msh))
def test_ishl7_empty(self):
self.assertFalse(hl7.ishl7(""))
=====================================
tox.ini
=====================================
@@ -1,17 +1,11 @@
[tox]
envlist =
- py39, py38, py37, py36, py35, docs
+ py310, py39, py38, py37, docs
[testenv]
commands =
python -m unittest discover -t . -s tests
-[testenv:py35]
-basepython = python3.5
-
-[testenv:py36]
-basepython = python3.6
-
[testenv:py37]
basepython = python3.7
@@ -21,6 +15,9 @@ basepython = python3.8
[testenv:py39]
basepython = python3.9
+[testenv:py310]
+basepython = python3.10
+
[testenv:docs]
whitelist_externals = make
deps =
View it on GitLab: https://salsa.debian.org/med-team/python-hl7/-/commit/e5cedb5a3020a4e323f00a099b36ac839eaa98e0
--
View it on GitLab: https://salsa.debian.org/med-team/python-hl7/-/commit/e5cedb5a3020a4e323f00a099b36ac839eaa98e0
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20220719/51d750ec/attachment-0001.htm>
More information about the debian-med-commit
mailing list