[med-svn] [Git][med-team/enlighten][upstream] New upstream version 1.7.2
Shayan Doust
gitlab at salsa.debian.org
Wed Dec 30 22:26:07 GMT 2020
Shayan Doust pushed to branch upstream at Debian Med / enlighten
Commits:
38b05bf0 by Shayan Doust at 2020-12-30T22:22:54+00:00
New upstream version 1.7.2
- - - - -
17 changed files:
- README.rst
- doc/faq.rst
- doc/install.rst
- doc/spelling_wordlist.txt
- enlighten/__init__.py
- enlighten/_counter.py
- enlighten/_manager.py
- enlighten/_statusbar.py
- − enlighten/_terminal.py
- enlighten/_util.py
- setup.cfg
- setup.py
- setup_helpers.py
- tests/test_counter.py
- tests/test_manager.py
- − tests/test_terminal.py
- tox.ini
Changes:
=====================================
README.rst
=====================================
@@ -1,9 +1,9 @@
.. start-badges
| |docs| |travis| |codecov|
-| |pypi| |supported-versions| |supported-implementations|
| |linux| |windows| |mac| |bsd|
-| |fedora| |EPEL|
+| |pypi| |supported-versions| |supported-implementations|
+| |Fedora| |EPEL| |Arch| |Debian| |Ubuntu|
.. |docs| image:: https://img.shields.io/readthedocs/python-enlighten.svg?style=plastic&logo=read-the-docs
:target: https://python-enlighten.readthedocs.org
@@ -45,13 +45,26 @@
:alt: BSD supported
:target: https://pypi.python.org/pypi/enlighten
-.. |fedora| image:: https://img.shields.io/badge/dynamic/json.svg?uri=https://pdc.fedoraproject.org/rest_api/v1/component-branches/?global_component=python-enlighten;fields=name;active=true;type=rpm&query=$.results[?(@.name.startsWith(%22f%22))].name&label=Fedora&colorB=lightgray&style=plastic&logo=fedora
- :alt: Fedora version support
- :target: https://bodhi.fedoraproject.org/updates/?packages=python-enlighten
+.. |Fedora| image:: https://img.shields.io/fedora/v/python3-enlighten?color=lightgray&logo=Fedora&style=plastic&label=Fedora
+ :alt: Latest Fedora Version
+ :target: https://src.fedoraproject.org/rpms/python-enlighten
+
+.. |EPEL| image:: https://img.shields.io/fedora/v/python3-enlighten/epel8?color=lightgray&label=EPEL&logo=EPEL
+ :alt: Latest EPEL Version
+ :target: https://src.fedoraproject.org/rpms/python-enlighten
+
+.. |Arch| image:: https://img.shields.io/aur/version/python-enlighten?color=lightgray&logo=Arch%20Linux&style=plastic&label=Arch
+ :alt: Latest Arch Linux Version
+ :target: https://aur.archlinux.org/packages/python-enlighten
+
+.. |Debian| image:: https://img.shields.io/debian/v/enlighten/sid?color=lightgray&logo=Debian&style=plastic&label=Debian
+ :alt: Latest Debian Version
+ :target: https://packages.debian.org/source/sid/enlighten
+
+.. |Ubuntu| image:: https://img.shields.io/ubuntu/v/enlighten?color=lightgray&logo=Ubuntu&style=plastic&label=Ubuntu
+ :alt: Latest Ubuntu Version
+ :target: https://launchpad.net/ubuntu/+source/enlighten
-.. |EPEL| image:: https://img.shields.io/badge/dynamic/json.svg?uri=https://pdc.fedoraproject.org/rest_api/v1/component-branches/?global_component=python-enlighten;fields=name;active=true;type=rpm&query=$.results[?(@.name.startsWith(%22e%22))].name&label=EPEL&colorB=lightgray&style=plastic&logo=epel
- :alt: EPEL version support
- :target: https://bodhi.fedoraproject.org/updates/?packages=python-enlighten
.. end-badges
@@ -86,22 +99,53 @@ PIP
$ pip install enlighten
-EL6, EL7, and EL8 (RHEL/CentOS/Scientific)
-------------------------------------------
+
+RPM
+---
+
+Fedora and EL8 (RHEL/CentOS)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+(EPEL_ repositories must be configured_ for EL8)
+
+.. code-block:: console
+
+ $ dnf install python3-enlighten
+
+EL7 (RHEL/CentOS)
+^^^^^^^^^^^^^^^^^
(EPEL_ repositories must be configured_)
.. code-block:: console
- $ yum install python-enlighten
+ $ yum install python2-enlighten
+ $ yum install python36-enlighten
-Fedora
-------
+
+PKG
+---
+
+Arch Linux
+^^^^^^^^^^
.. code-block:: console
- $ dnf install python2-enlighten
- $ dnf install python3-enlighten
+ $ pacman -S python-enlighten
+
+
+DEB
+---
+
+Debian and Ubuntu
+^^^^^^^^^^^^^^^^^
+.. code-block:: console
+
+ $ apt-get install python3-enlighten
+
+
+.. _EPEL: https://fedoraproject.org/wiki/EPEL
+.. _configured: https://fedoraproject.org/wiki/EPEL#How_can_I_use_these_extra_packages.3F
Examples
=====================================
doc/faq.rst
=====================================
@@ -51,7 +51,7 @@ does not reference a valid TTY.
eol
Can you add support for _______ terminal?
----------------------------------------------------
+-----------------------------------------
We are happy to add support for as many terminals as we can.
However, not all terminals can be supported. There a few requirements.
@@ -79,3 +79,18 @@ However, not all terminals can be supported. There a few requirements.
3. Terminal dimensions must be detectable
The height and width of the terminal must be available to the running process.
+
+Why does ``RuntimeError: reentrant call`` get raised sometimes during a resize?
+-------------------------------------------------------------------------------
+
+This is caused when another thread or process is writing to a standard stream (STDOUT, STDERR)
+at the same time the resize signal handler is writing to the stream.
+
+Enlighten tries to detect when a program is threaded or running multiple processes and defer
+resize handling until the next normal write event. However, this condition is evaluated when
+the scroll area is set, typically when the first counter is added. If no threads or processes
+are detected at that time, and the value of threaded was not set explicitly, resize events will not
+be deferred.
+
+In order to guarantee resize handling is deferred, it is best to pass ``threaded=True`` when
+creating a manager instance.
=====================================
doc/install.rst
=====================================
@@ -22,10 +22,8 @@ PIP
RPM
---
-RPMs are available in the Fedora_ and EPEL_ repositories
-
-Fedora and EL8 (RHEL/CentOS/Scientific)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Fedora and EL8 (RHEL/CentOS)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(EPEL_ repositories must be configured_ for EL8)
@@ -33,8 +31,8 @@ Fedora and EL8 (RHEL/CentOS/Scientific)
$ dnf install python3-enlighten
-EL7 (RHEL/CentOS/Scientific)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^
+EL7 (RHEL/CentOS)
+^^^^^^^^^^^^^^^^^
(EPEL_ repositories must be configured_)
@@ -44,6 +42,26 @@ EL7 (RHEL/CentOS/Scientific)
$ yum install python36-enlighten
+PKG
+---
+
+Arch Linux
+^^^^^^^^^^
+
+.. code-block:: console
+
+ $ pacman -S python-enlighten
+
+
+DEB
+---
+
+Debian and Ubuntu
+^^^^^^^^^^^^^^^^^
+.. code-block:: console
+
+ $ apt-get install python3-enlighten
+
+
.. _EPEL: https://fedoraproject.org/wiki/EPEL
-.. _Fedora: https://fedoraproject.org/
.. _configured: https://fedoraproject.org/wiki/EPEL#How_can_I_use_these_extra_packages.3F
=====================================
doc/spelling_wordlist.txt
=====================================
@@ -3,6 +3,7 @@ iterable
iterables
natively
programmatically
+resize
resizing
stdout
stderr
=====================================
enlighten/__init__.py
=====================================
@@ -16,6 +16,6 @@ from enlighten._manager import Manager, get_manager
from enlighten._util import EnlightenWarning, Justify
-__version__ = '1.6.2'
+__version__ = '1.7.2'
__all__ = ('Counter', 'EnlightenWarning', 'Justify', 'Manager',
'StatusBar', 'SubCounter', 'get_manager')
=====================================
enlighten/_counter.py
=====================================
@@ -18,7 +18,8 @@ import sys
import time
from enlighten._basecounter import BaseCounter, PrintableCounter
-from enlighten._util import EnlightenWarning, format_time, raise_from_none, warn_best_level
+from enlighten._util import (EnlightenWarning, FORMAT_MAP_SUPPORT, format_time,
+ raise_from_none, warn_best_level)
COUNTER_FMT = u'{desc}{desc_pad}{count:d} {unit}{unit_pad}' + \
u'[{elapsed}, {rate:.2f}{unit_pad}{unit}/s]{fill}'
@@ -43,9 +44,9 @@ except (AttributeError, TypeError): # pragma: no cover(Non-standard Terminal)
pass
# Reserved fields
-COUNTER_FIELDS = {'count', 'desc', 'desc_pad', 'elapsed', 'rate', 'unit', 'unit_pad',
+COUNTER_FIELDS = {'count', 'desc', 'desc_pad', 'elapsed', 'interval', 'rate', 'unit', 'unit_pad',
'bar', 'eta', 'len_total', 'percentage', 'total', 'fill'}
-RE_SUBCOUNTER_FIELDS = re.compile(r'(?:count|percentage|eta|rate)_\d+')
+RE_SUBCOUNTER_FIELDS = re.compile(r'(?:count|percentage|eta|interval|rate)_\d+')
class SubCounter(BaseCounter):
@@ -75,7 +76,7 @@ class SubCounter(BaseCounter):
Args:
color(str): Series color as a string or RGB tuple see :ref:`Series Color <series_color>`
count(int): Initial count (Default: 0)
- all_fields(bool): Populate ``rate`` and ``eta`` fields (Default: False)
+ all_fields(bool): Populate ``rate``, ``interval``, and ``eta`` fields (Default: False)
"""
if parent.count - parent.subcount - count < 0:
@@ -293,7 +294,8 @@ class Counter(PrintableCounter):
- desc(:py:class:`str`) - Value of ``desc``
- desc_pad(:py:class:`str`) - A single space if ``desc`` is set, otherwise empty
- elapsed(:py:class:`str`) - Time elapsed since instance was created
- - rate(:py:class:`float`) - Average increments per second since instance was created
+ - interval(:py:class:`float`) - Average seconds per iteration (inverse of rate)
+ - rate(:py:class:`float`) - Average iterations per second since instance was created
- unit(:py:class:`str`) - Value of ``unit``
- unit_pad(:py:class:`str`) - A single space if ``unit`` is set, otherwise empty
@@ -328,7 +330,8 @@ class Counter(PrintableCounter):
``all_fields`` set to :py:data:`True`:
- eta_n (:py:class:`str`) - Estimated time to completion (``bar_format`` only)
- - rate_n (:py:class:`float`) - Average increments per second since parent was created
+ - interval_n(:py:class:`float`) - Average seconds per iteration (inverse of rate)
+ - rate_n (:py:class:`float`) - Average iterations per second since parent was created
User-defined fields:
@@ -448,7 +451,7 @@ class Counter(PrintableCounter):
Each subcounter in the list will be in a tuple of (subcounter, percentage)
Fields in the dictionary are addressed in the Format documentation of this class
- When `bar_fields` is False, only subcounter count and rate fields are set.
+ When `bar_fields` is False, only subcounter count, interval, and rate fields are set.
percentage will be set to 0.0
"""
@@ -457,7 +460,7 @@ class Counter(PrintableCounter):
for num, subcounter in enumerate(self._subcounters, 1):
- fields['count_{0}'.format(num)] = subcounter.count
+ fields['count_%d' % num] = subcounter.count
if self.total and bar_fields:
subPercentage = subcounter.count / float(self.total)
@@ -465,30 +468,32 @@ class Counter(PrintableCounter):
subPercentage = 0.0
if bar_fields:
- fields['percentage_{0}'.format(num)] = subPercentage * 100
+ fields['percentage_%d' % num] = subPercentage * 100
# Save in tuple: count, percentage
subcounters.append((subcounter, subPercentage))
if subcounter.all_fields:
- interations = abs(subcounter.count - subcounter.start_count)
+ interations = float(abs(subcounter.count - subcounter.start_count))
if elapsed:
# Use float to force to float in Python 2
- rate = fields['rate_{0}'.format(num)] = interations / float(elapsed)
+ rate = fields['rate_%d' % num] = interations / elapsed
else:
- rate = fields['rate_{0}'.format(num)] = 0.0
+ rate = fields['rate_%d' % num] = 0.0
+
+ fields['interval_%d' % num] = rate ** -1 if rate else 0.0
if not bar_fields:
continue
if self.total == 0:
- fields['eta_{0}'.format(num)] = u'00:00'
+ fields['eta_%d' % num] = u'00:00'
elif rate:
- fields['eta_{0}'.format(num)] = format_time((self.total - interations) / rate)
+ fields['eta_%d' % num] = format_time((self.total - interations) / rate)
else:
- fields['eta_{0}'.format(num)] = u'?'
+ fields['eta_%d' % num] = u'?'
return subcounters, fields
@@ -507,7 +512,7 @@ class Counter(PrintableCounter):
width = width or self.manager.width
- iterations = abs(self.count - self.start_count)
+ iterations = float(abs(self.count - self.start_count))
fields = self.fields.copy()
fields.update(self._fields)
@@ -538,10 +543,11 @@ class Counter(PrintableCounter):
# Get rate. Elapsed could be 0 if counter was not updated and has a zero total.
if elapsed:
# Use iterations so a counter running backwards is accurate
- fields['rate'] = iterations / elapsed
+ rate = fields['rate'] = iterations / elapsed
else:
- fields['rate'] = 0.0
+ rate = fields['rate'] = 0.0
+ fields['interval'] = rate ** -1 if rate else 0.0
# Only process bar if total was given and n doesn't exceed total
if self.total is not None and self.count <= self.total:
@@ -557,9 +563,9 @@ class Counter(PrintableCounter):
percentage = self.count / float(self.total)
# Get eta
- if fields['rate']:
+ if rate:
# Use iterations so a counter running backwards is accurate
- fields['eta'] = format_time((self.total - iterations) / fields['rate'])
+ fields['eta'] = format_time((self.total - iterations) / rate)
else:
fields['eta'] = u'?'
@@ -576,7 +582,10 @@ class Counter(PrintableCounter):
# Partially format
try:
- rtn = self.bar_format.format(**fields)
+ if FORMAT_MAP_SUPPORT:
+ rtn = self.bar_format.format_map(fields)
+ else: # pragma: no cover
+ rtn = self.bar_format.format(**fields)
except KeyError as e:
raise_from_none(ValueError('%r specified in format, but not provided' % e.args[0]))
@@ -616,7 +625,10 @@ class Counter(PrintableCounter):
fields['count_0'] = self.count - sum(sub[0].count for sub in subcounters)
try:
- rtn = self.counter_format.format(**fields)
+ if FORMAT_MAP_SUPPORT:
+ rtn = self.counter_format.format_map(fields)
+ else: # pragma: no cover
+ rtn = self.counter_format.format(**fields)
except KeyError as e:
raise_from_none(ValueError('%r specified in format, but not provided' % e.args[0]))
@@ -648,7 +660,8 @@ class Counter(PrintableCounter):
Args:
color(str): Series color as a string or RGB tuple see :ref:`Series Color <series_color>`
count(int): Initial count (Default: 0)
- all_fields(bool): Populate ``rate`` and ``eta`` formatting fields (Default: False)
+ all_fields(bool): Populate ``rate``, ``interval``, and ``eta``
+ formatting fields (Default: False)
Returns:
:py:class:`SubCounter`: Subcounter instance
=====================================
enlighten/_manager.py
=====================================
@@ -13,13 +13,16 @@ Provides Manager class
import atexit
from collections import OrderedDict
+import multiprocessing
import signal
import sys
+import threading
import time
+from blessed import Terminal
+
from enlighten._counter import Counter
from enlighten._statusbar import StatusBar
-from enlighten._terminal import Terminal
RESIZE_SUPPORTED = hasattr(signal, 'SIGWINCH')
@@ -36,6 +39,8 @@ class Manager(object):
below. (Default: :py:data:`None`)
enabled(bool): Status (Default: True)
no_resize(bool): Disable resizing support
+ threaded(bool): When True resize handling is deferred until next write (Default: False
+ unless multiple threads or multiple processes are detected)
kwargs(Dict[str, Any]): Any additional :py:term:`keyword arguments<keyword argument>`
will be used as default values when :py:meth:`counter` is called.
@@ -70,6 +75,7 @@ class Manager(object):
self.enabled = kwargs.get('enabled', True) # Double duty for counters
self.no_resize = kwargs.pop('no_resize', False)
self.set_scroll = kwargs.pop('set_scroll', True)
+ self.threaded = kwargs.pop('threaded', None) # Defer evaluation
self.term = Terminal(stream=self.stream)
# Set up companion stream
@@ -95,6 +101,8 @@ class Manager(object):
self.companion_term = None
self.autorefresh = []
+ self._buffer = []
+ self._companion_buffer = []
self.height = self.term.height
self.process_exit = False
self.refresh_lock = False
@@ -264,9 +272,14 @@ class Manager(object):
# Set semaphore to trigger resize on next write
self._resize = True
- # Reset update time to avoid any delay in resize
- for counter in self.counters:
- counter.last_update = 0
+ if self.threaded:
+ # Reset update time to avoid any delay in resize
+ for counter in self.counters:
+ counter.last_update = 0
+
+ else:
+ # If not threaded, handle resize now
+ self._resize_handler()
def _resize_handler(self):
"""
@@ -276,30 +289,35 @@ class Manager(object):
"""
# Make sure only one resize handler is running
- try:
- assert self.resize_lock
- except AssertionError:
+ if self.resize_lock:
+ return
- self.resize_lock = True
- term = self.term
+ self.resize_lock = True
+ buffer = self._buffer
+ term = self.term
- term.clear_cache()
- newHeight = term.height
- newWidth = term.width
+ oldHeight = self.height
+ newHeight = self.height = term.height
+ newWidth = term.width
- if newWidth < self.width:
- offset = (self.scroll_offset - 1) * (1 + self.width // newWidth)
- term.move_to(0, max(0, newHeight - offset))
- self.stream.write(term.clear_eos)
+ if newHeight < oldHeight:
+ buffer.append(term.move(max(0, newHeight - self.scroll_offset), 0))
+ buffer.append(u'\n' * (2 * max(self.counters.values())))
+ elif newHeight > oldHeight and self.threaded:
+ buffer.append(term.move(newHeight, 0))
+ buffer.append(u'\n' * (self.scroll_offset - 1))
- self.width = newWidth
- self._set_scroll_area(force=True)
+ buffer.append(term.move(max(0, newHeight - self.scroll_offset), 0))
+ buffer.append(term.clear_eos)
- for counter in self.counters:
- counter.refresh(flush=False)
- self._flush_streams()
+ self.width = newWidth
+ self._set_scroll_area(force=True)
- self.resize_lock = False
+ for counter in self.counters:
+ counter.refresh(flush=False)
+ self._flush_streams()
+
+ self.resize_lock = False
def _set_scroll_area(self, force=False):
"""
@@ -325,40 +343,58 @@ class Manager(object):
if not self.process_exit:
atexit.register(self._at_exit)
if not self.no_resize and RESIZE_SUPPORTED:
+ if self.threaded is None:
+ self.threaded = (
+ threading.active_count() > 1 # Multiple threads
+ or multiprocessing.active_children() # Main process with children
+ or multiprocessing.current_process().name != 'MainProcess' # Child process
+ )
signal.signal(signal.SIGWINCH, self._stage_resize)
self.process_exit = True
if self.set_scroll:
+ buffer = self._buffer
term = self.term
- newHeight = term.height
- scrollPosition = max(0, newHeight - self.scroll_offset)
+ scrollPosition = max(0, self.height - self.scroll_offset)
- if force or use_new or newHeight != self.height:
- self.height = newHeight
+ if force or use_new:
# Add line feeds so we don't overwrite existing output
if use_new:
- term.move_to(0, max(0, newHeight - oldOffset))
- self.stream.write(u'\n' * (newOffset - oldOffset))
+ buffer.append(term.move(max(0, self.height - oldOffset), 0))
+ buffer.append(u'\n' * (newOffset - oldOffset))
# Reset scroll area
- self.term.change_scroll(scrollPosition)
+ buffer.append(term.hide_cursor)
+ buffer.append(term.csr(0, scrollPosition))
# Always reset position
- term.move_to(0, scrollPosition)
+ buffer.append(term.move(scrollPosition, 0))
if self.companion_term is not None:
- self.companion_term.move_to(0, scrollPosition)
+ self._companion_buffer.append(term.move(scrollPosition, 0))
def _flush_streams(self):
"""
Convenience method for flushing streams
"""
+ buffer = self._buffer
+ companion_buffer = self._companion_buffer
+
+ if buffer:
+ self.stream.write(u''.join(buffer))
+
self.stream.flush()
+
if self.companion_stream is not None:
+ if companion_buffer:
+ self.companion_stream.write(u''.join(companion_buffer))
self.companion_stream.flush()
+ del buffer[:] # Python 2.7 does not support list.clear()
+ del companion_buffer[:]
+
def _at_exit(self):
"""
Resets terminal to normal configuration
@@ -369,13 +405,14 @@ class Manager(object):
try:
term = self.term
+ buffer = self._buffer
if self.set_scroll:
- term.reset()
- else:
- term.move_to(0, term.height)
+ buffer.append(self.term.normal_cursor)
+ buffer.append(self.term.csr(0, self.height - 1))
- self.term.feed()
+ buffer.append(term.move(term.height, 0))
+ buffer.append(term.cud1 or u'\n')
self._flush_streams()
@@ -418,8 +455,9 @@ class Manager(object):
if not self.enabled:
return
+ buffer = self._buffer
term = self.term
- stream = self.stream
+ height = term.height
positions = self.counters.values()
if not self.no_resize and RESIZE_SUPPORTED:
@@ -428,20 +466,22 @@ class Manager(object):
try:
for num in range(self.scroll_offset - 1, 0, -1):
if num not in positions:
- term.move_to(0, term.height - num)
- stream.write(term.clear_eol)
+ buffer.append(term.move(height - num, 0))
+ buffer.append(term.clear_eol)
finally:
+ # Reset terminal
if self.set_scroll:
-
- self.term.reset()
-
+ buffer.append(term.normal_cursor)
+ buffer.append(term.csr(0, self.height - 1))
if self.companion_term:
- self.companion_term.reset()
+ self._companion_buffer.extend((term.normal_cursor,
+ term.csr(0, self.height - 1),
+ term.move(height, 0)))
- else:
- term.move_to(0, term.height)
+ # Re-home cursor
+ buffer.append(term.move(height, 0))
self.process_exit = False
self.enabled = False
@@ -450,7 +490,7 @@ class Manager(object):
# Feed terminal if lowest position isn't cleared
if 1 in positions:
- term.feed()
+ buffer.append(term.cud1 or '\n')
self._flush_streams()
@@ -490,9 +530,10 @@ class Manager(object):
output = output(**kwargs)
try:
- term.move_to(0, term.height - position)
- # Include \r and term call to cover most conditions
- self.stream.write(u'\r' + term.clear_eol + output)
+ self._buffer.extend((term.move(self.height - position, 0),
+ u'\r',
+ term.clear_eol,
+ output))
finally:
# Reset position and scrolling
@@ -511,22 +552,17 @@ class Manager(object):
Refresh any bars specified for auto-refresh
"""
- # Make sure this is only running once
- try:
- assert self.refresh_lock
- except AssertionError:
-
- self.refresh_lock = True
- current_time = time.time()
+ self.refresh_lock = True
+ current_time = time.time()
- for counter in self.autorefresh:
+ for counter in self.autorefresh:
- if counter in exclude or counter.min_delta > current_time - counter.last_update:
- continue
+ if counter in exclude or counter.min_delta > current_time - counter.last_update:
+ continue
- counter.refresh()
+ counter.refresh()
- self.refresh_lock = False
+ self.refresh_lock = False
def get_manager(stream=None, counterclass=Counter, **kwargs):
=====================================
enlighten/_statusbar.py
=====================================
@@ -14,7 +14,8 @@ Provides StatusBar class
import time
from enlighten._basecounter import PrintableCounter
-from enlighten._util import EnlightenWarning, format_time, Justify, raise_from_none, warn_best_level
+from enlighten._util import (EnlightenWarning, FORMAT_MAP_SUPPORT, format_time,
+ Justify, raise_from_none, warn_best_level)
STATUS_FIELDS = {'elapsed', 'fill'}
@@ -203,7 +204,10 @@ class StatusBar(PrintableCounter):
# Format
try:
- rtn = self.status_format.format(**fields)
+ if FORMAT_MAP_SUPPORT:
+ rtn = self.status_format.format_map(fields)
+ else: # pragma: no cover
+ rtn = self.status_format.format(**fields)
except KeyError as e:
raise_from_none(ValueError('%r specified in format, but not provided' % e.args[0]))
=====================================
enlighten/_terminal.py deleted
=====================================
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2017 - 2018 Avram Lubkin, All Rights Reserved
-
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-"""
-**Enlighten terminal submodule**
-
-Provides Terminal class
-"""
-
-from blessed import Terminal as _Terminal
-
-
-class Terminal(_Terminal):
- """
- Subclass of :py:class:`blessings.Terminal`
-
- Adds convenience methods and caching for width and height
- """
-
- def __init__(self, *args, **kwargs):
-
- super(Terminal, self).__init__(*args, **kwargs)
- self._cache = {}
-
- def reset(self):
- """
- Reset scroll window and cursor to default
- """
-
- self.stream.write(self.normal_cursor)
- self.stream.write(self.csr(0, self.height - 1))
- self.stream.write(self.move(self.height, 0))
-
- def feed(self):
- """
- Feed a single line
- """
-
- self.stream.write(self.cud1 or '\n')
-
- def change_scroll(self, position):
- """
- Args:
- position (int): Vertical location to end scroll window
-
- Change scroll window
- """
-
- self.stream.write(self.hide_cursor)
- self.stream.write(self.csr(0, position))
- self.stream.write(self.move(position, 0))
-
- def move_to(self, x_pos, y_pos):
- """
- Move cursor to specified position
- """
-
- self.stream.write(self.move(y_pos, x_pos))
-
- def _height_and_width(self):
- """
- Override for blessings.Terminal._height_and_width
- Adds caching
- """
-
- try:
- return self._cache['height_and_width']
- except KeyError:
- h_and_w = self._cache['height_and_width'] = super(Terminal, self)._height_and_width()
- return h_and_w
-
- def clear_cache(self):
- """
- Clear cached terminal returns
- """
-
- self._cache.clear()
=====================================
enlighten/_util.py
=====================================
@@ -23,6 +23,7 @@ except NameError:
BASESTRING = str
BASE_DIR = os.path.basename(os.path.dirname(__file__))
+FORMAT_MAP_SUPPORT = sys.version_info[:2] >= (3, 2)
class EnlightenWarning(Warning):
@@ -55,17 +56,17 @@ def format_time(seconds):
# Always do minutes and seconds in mm:ss format
minutes = seconds // 60
hours = minutes // 60
- rtn = u'{0:02.0f}:{1:02.0f}'.format(minutes % 60, seconds % 60)
+ rtn = u'%02.0f:%02.0f' % (minutes % 60, seconds % 60)
# Add hours if there are any
if hours:
- rtn = u'{0:d}h {1}'.format(int(hours % 24), rtn)
+ rtn = u'%dh %s' % (int(hours % 24), rtn)
# Add days if there are any
days = int(hours // 24)
if days:
- rtn = u'{0:d}d {1}'.format(days, rtn)
+ rtn = u'%dd %s' % (days, rtn)
return rtn
=====================================
setup.cfg
=====================================
@@ -36,6 +36,7 @@ source-dir = doc
build-dir = build/doc
all_files = True
fresh-env = True
+warning-is-error = True
[aliases]
spelling=build_sphinx --builder spelling
=====================================
setup.py
=====================================
@@ -56,6 +56,7 @@ setup(
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
+ 'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Utilities',
=====================================
setup_helpers.py
=====================================
@@ -82,6 +82,31 @@ def spelling_clean_dir(path):
os.unlink(os.path.join(path, filename))
+def check_rst2html(path):
+ """
+ Checks for warnings when doing ReST to HTML conversion
+ """
+
+ # pylint: disable=import-error,import-outside-toplevel
+ from contextlib import redirect_stderr # Import here because it breaks <= Python 3.4
+ from docutils.core import publish_file # Import here because only available in doc tests
+
+ stderr = io.StringIO()
+
+ # This will exit with status if there is a bad enough error
+ with redirect_stderr(stderr):
+ output = publish_file(source_path=path, writer_name='html',
+ enable_exit_status=True, destination_path='/dev/null')
+
+ warning_text = stderr.getvalue()
+
+ if warning_text or not output:
+ print(warning_text)
+ return 1
+
+ return 0
+
+
if __name__ == '__main__':
# Do nothing if no arguments were given
@@ -100,6 +125,12 @@ if __name__ == '__main__':
else:
sys.exit(print_all_spelling_errors(DIR_SPELLING))
+ # Check file for Rest to HTML conversion
+ if sys.argv[1] == 'rst2html':
+ if len(sys.argv) < 3:
+ sys.exit('Missing filename for ReST to HTML check')
+ sys.exit(check_rst2html(sys.argv[2]))
+
# Unknown option
else:
sys.stderr.write('Unknown option: %s' % sys.argv[1])
=====================================
tests/test_counter.py
=====================================
@@ -276,12 +276,14 @@ class TestCounter(TestCase):
self.assertEqual(subcounters, [(subcounter1, 0.0), (subcounter2, 0.4), (subcounter3, 0.1)])
self.assertEqual(fields, {'percentage_1': 0.0, 'percentage_2': 40.0, 'percentage_3': 10.0,
'count_1': 0, 'count_2': 4, 'count_3': 1,
+ 'interval_2': 2.0, 'interval_3': 0.0,
'rate_2': 0.5, 'eta_2': '00:12', 'rate_3': 0.0, 'eta_3': '?'})
subcounters, fields = self.ctr._get_subcounters(0)
self.assertEqual(subcounters, [(subcounter1, 0.0), (subcounter2, 0.4), (subcounter3, 0.1)])
self.assertEqual(fields, {'percentage_1': 0.0, 'percentage_2': 40.0, 'percentage_3': 10.0,
'count_1': 0, 'count_2': 4, 'count_3': 1,
+ 'interval_2': 0.0, 'interval_3': 0.0,
'rate_2': 0.0, 'eta_2': '?', 'rate_3': 0.0, 'eta_3': '?'})
self.ctr = Counter(total=0, desc='Test', unit='ticks', manager=self.manager)
@@ -289,7 +291,7 @@ class TestCounter(TestCase):
subcounters, fields = self.ctr._get_subcounters(8)
self.assertEqual(subcounters, [(subcounter1, 0.0)])
self.assertEqual(fields, {'percentage_1': 0.0, 'count_1': 0,
- 'rate_1': 0.0, 'eta_1': '00:00'})
+ 'interval_1': 0.0, 'rate_1': 0.0, 'eta_1': '00:00'})
def test_get_subcounter_counter_format(self):
self.ctr.count = 12
@@ -301,6 +303,7 @@ class TestCounter(TestCase):
subcounters, fields = self.ctr._get_subcounters(8, bar_fields=False)
self.assertEqual(subcounters, [(subcounter1, 0.0), (subcounter2, 0.0), (subcounter3, 0.0)])
self.assertEqual(fields, {'count_1': 0, 'count_2': 6, 'count_3': 1,
+ 'interval_2': 0.75 ** -1, 'interval_3': 0.0,
'rate_2': 0.75, 'rate_3': 0.0})
def test_remove(self):
@@ -785,3 +788,20 @@ class TestCounter(TestCase):
with self.assertWarns(EnlightenWarning) as warn:
ctr.format()
self.assertRegex(__file__, warn.filename)
+
+ def test_builtin_bar_fields(self):
+ """
+ Ensure all built-in fields are populated as expected
+ """
+
+ bar_fields = tuple(field for field in enlighten._counter.COUNTER_FIELDS if field != 'fill')
+ bar_format = u', '.join(u'%s: {%s}' % (field, field) for field in sorted(bar_fields))
+
+ ctr = Counter(stream=self.tty.stdout, total=100, bar_format=bar_format,
+ unit='parsecs', desc='Kessel runs')
+
+ ctr.count = 50
+ fields = 'bar: , count: 50, desc: Kessel runs, desc_pad: , elapsed: 00:50, eta: 00:50, ' \
+ 'interval: 1.0, len_total: 3, percentage: 50.0, rate: 1.0, total: 100, ' \
+ 'unit: parsecs, unit_pad: '
+ self.assertEqual(ctr.format(elapsed=50, width=80), fields)
=====================================
tests/test_manager.py
=====================================
@@ -19,7 +19,7 @@ from tests import (unittest, TestCase, mock, MockTTY, MockCounter,
redirect_output, OUTPUT, STDOUT_NO_FD)
-TERMINAL = 'enlighten._terminal.Terminal'
+TERMINAL = 'blessed.Terminal'
# pylint: disable=missing-docstring, protected-access, too-many-statements, too-many-public-methods
@@ -254,26 +254,55 @@ class TestManager(TestCase):
def test_write_no_flush(self):
"""
- No real difference in our tests because stream is flushed on each new line
- If we don't flush, reading will just hang
-
- But we added this for coverage and as a framework future tests
+ Output is stored in buffer, but not flushed to stream
"""
- msg = 'test message'
+ msg = u'test message'
with mock.patch('enlighten._manager.Manager._set_scroll_area') as ssa:
- manager = _manager.Manager(stream=self.tty.stdout)
+ manager = _manager.Manager(stream=self.tty.stdout, companion_stream=OUTPUT)
counter = manager.counter(position=3)
term = manager.term
manager.write(msg, counter=counter, flush=False)
+ self.assertEqual(manager._buffer,
+ [term.move(term.height - 3, 0), '\r', term.clear_eol, msg])
+ self.assertEqual(manager._companion_buffer, [])
+
self.tty.stdout.write(u'X\n')
- # Carriage return is getting converted to newline
- self.assertEqual(self.tty.stdread.readline(),
- term.move(22, 0) + '\r' + term.clear_eol + msg + 'X\n')
+
+ # No output
+ self.assertEqual(self.tty.stdread.readline(), 'X\n')
self.assertEqual(ssa.call_count, 2)
+ def test_flush_companion_buffer(self):
+
+ """
+ Output is stored in buffer, but only written in companion stream is defined
+ """
+
+ manager = _manager.Manager(stream=self.tty.stdout)
+ msg = u'test message'
+
+ manager._companion_buffer = [msg]
+
+ manager._flush_streams()
+
+ # Companion buffer flushed, but not outputted
+ self.assertEqual(manager._companion_buffer, [])
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(), 'X\n')
+
+ # set companion stream and test again
+ manager.companion_stream = OUTPUT
+ manager._companion_buffer = [msg]
+ manager._flush_streams()
+
+ self.assertEqual(manager._companion_buffer, [])
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(), 'X\n')
+ self.assertEqual(OUTPUT.getvalue(), msg)
+
def test_autorefresh(self):
"""
Ensure auto-refreshed counters are updated when others are
@@ -302,6 +331,17 @@ class TestManager(TestCase):
self.assertRegex(output, 'counter2')
self.assertNotRegex(output, 'counter1')
+ # If already auto-refreshing, skip
+ manager.refresh_lock = True
+ counter1.last_update = 0
+ counter2.refresh()
+ # Have to explicitly flush
+ manager._flush_streams()
+ self.tty.stdout.write(u'X\n')
+ output = self.tty.stdread.readline()
+ self.assertRegex(output, 'counter2')
+ self.assertNotRegex(output, 'counter1')
+
def test_set_scroll_area_disabled(self):
manager = _manager.Manager(stream=self.tty.stdout,
counter_class=MockCounter, set_scroll=False)
@@ -317,8 +357,7 @@ class TestManager(TestCase):
manager.scroll_offset = 4
manager._set_scroll_area()
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), manager.term.move(21, 0) + 'X\n')
+ self.assertEqual(manager._buffer, [manager.term.move(21, 0)])
def test_set_scroll_area_companion(self):
"""
@@ -332,9 +371,9 @@ class TestManager(TestCase):
term = manager.term
manager._set_scroll_area()
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(),
- term.move(21, 0) + term.move(21, 0) + 'X\n')
+
+ self.assertEqual(manager._buffer, [term.move(21, 0)])
+ self.assertEqual(manager._companion_buffer, [term.move(21, 0)])
def test_set_scroll_area(self):
manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
@@ -344,73 +383,84 @@ class TestManager(TestCase):
self.assertEqual(manager.scroll_offset, 1)
self.assertFalse(manager.process_exit)
self.assertNotEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
+ old_offset = manager.scroll_offset
with mock.patch('enlighten._manager.atexit') as atexit:
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
- self.assertEqual(term.change_scroll.call_count, 1) # pylint: disable=no-member
-
- self.assertEqual(manager.scroll_offset, 4)
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
-
- self.assertEqual(stdread.readline(), term.move(24, 0) + '\n')
- self.assertEqual(stdread.readline(), '\n')
- self.assertEqual(stdread.readline(), '\n')
-
- self.assertTrue(manager.process_exit)
-
- atexit.register.assert_called_with(manager._at_exit)
+ manager._set_scroll_area()
+ self.assertEqual(manager.scroll_offset, 4)
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
+ self.assertTrue(manager.process_exit)
+ atexit.register.assert_called_with(manager._at_exit)
+
+ offset = manager.scroll_offset
+ scroll_position = term.height - offset
+ self.assertEqual(manager._buffer,
+ [term.move(term.height - old_offset, 0),
+ '\n' * (offset - old_offset),
+ term.hide_cursor, term.csr(0, scroll_position),
+ term.move(scroll_position, 0)])
+
+ # No companion buffer defined
+ self.assertEqual(manager._companion_buffer, [])
+
+ # Make sure nothing was flushed
self.tty.stdout.write(u'X\n')
- self.assertEqual(stdread.readline(), term.move(21, 0) + 'X\n')
+ self.assertEqual(stdread.readline(), 'X\n')
# Run it again and make sure exit handling isn't reset
+ del manager._buffer[:]
+ del manager._companion_buffer[:]
with mock.patch('enlighten._manager.atexit') as atexit:
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area(force=True)
- self.assertEqual(term.change_scroll.call_count, 1) # pylint: disable=no-member
+ manager._set_scroll_area(force=True)
- self.assertFalse(atexit.register.called)
+ self.assertFalse(atexit.register.called)
+ self.assertEqual(manager._buffer,
+ [term.hide_cursor, term.csr(0, scroll_position),
+ term.move(scroll_position, 0)])
# Set max counter lower and make sure scroll_offset hasn't changed
manager.counters['dummy'] = 1
with mock.patch('enlighten._manager.atexit') as atexit:
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
+ manager._set_scroll_area()
+
self.assertEqual(manager.scroll_offset, 4)
- def test_set_scroll_area_height(self):
+ def test_set_scroll_area_force(self):
manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
manager.counters['dummy'] = 3
manager.scroll_offset = 4
manager.height = 20
+ scroll_position = manager.height - manager.scroll_offset
term = manager.term
with mock.patch('enlighten._manager.atexit') as atexit:
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
- self.assertEqual(term.change_scroll.call_count, 1) # pylint: disable=no-member
+ manager._set_scroll_area(force=True)
self.assertEqual(manager.scroll_offset, 4)
- self.assertEqual(manager.height, 25)
self.assertTrue(manager.process_exit)
- term.stream.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), term.move(21, 0) + 'X\n')
+ self.assertEqual(manager._buffer,
+ [term.hide_cursor,
+ term.csr(0, scroll_position),
+ term.move(scroll_position, 0)])
+ self.assertEqual(manager._companion_buffer, [])
atexit.register.assert_called_with(manager._at_exit)
def test_at_exit(self):
tty = MockTTY()
- with mock.patch('%s.reset' % TERMINAL) as reset:
+ try:
with mock.patch.object(tty, 'stdout', wraps=tty.stdout) as mockstdout:
manager = _manager.Manager(stream=tty.stdout, counter_class=MockCounter)
term = manager.term
+ reset = (term.normal_cursor +
+ term.csr(0, term.height - 1) +
+ term.move(term.height, 0))
# process_exit is False
manager._at_exit()
- self.assertFalse(reset.called)
self.assertFalse(mockstdout.flush.called)
# No output
tty.stdout.write(u'X\n')
@@ -420,129 +470,126 @@ class TestManager(TestCase):
manager.process_exit = True
manager.set_scroll = False
manager._at_exit()
- self.assertFalse(reset.called)
self.assertEqual(mockstdout.flush.call_count, 1)
self.assertEqual(tty.stdread.readline(), term.move(25, 0) + term.cud1)
# process_exit is True, set_scroll True
manager.set_scroll = True
manager._at_exit()
- self.assertEqual(reset.call_count, 1)
self.assertEqual(mockstdout.flush.call_count, 2)
- self.assertEqual(tty.stdread.readline(), term.cud1)
+ self.assertEqual(tty.stdread.readline(), reset + term.cud1)
# Ensure companion stream gets flushed
manager.companion_stream = tty.stdout
manager._at_exit()
- self.assertEqual(reset.call_count, 2)
self.assertEqual(mockstdout.flush.call_count, 4)
- self.assertEqual(tty.stdread.readline(), term.cud1)
+ self.assertEqual(tty.stdread.readline(), reset + term.cud1)
term = manager.term
- # Ensure no errors if tty closes before _at_exit is called
- tty.close()
- manager._at_exit()
+ finally:
+ # Ensure no errors if tty closes before _at_exit is called
+ tty.close()
+ manager._at_exit()
def test_stop(self):
- with mock.patch('%s.reset' % TERMINAL) as reset:
- manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
- manager.counters[MockCounter(manager=manager)] = 3
- manager.counters[MockCounter(manager=manager)] = 4
- term = manager.term
- self.assertIsNone(manager.companion_term)
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ manager.counters[MockCounter(manager=manager)] = 3
+ manager.counters[MockCounter(manager=manager)] = 4
+ term = manager.term
+ self.assertIsNone(manager.companion_term)
- with mock.patch('enlighten._manager.atexit'):
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
+ with mock.patch('enlighten._manager.atexit'):
+ manager._set_scroll_area()
- self.assertEqual(manager.scroll_offset, 5)
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
- self.assertTrue(manager.process_exit)
+ self.assertEqual(manager.scroll_offset, 5)
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
+ self.assertTrue(manager.process_exit)
- # Clear stream
- self.tty.stdout.write(u'X\n')
- for _ in range(4 + 1):
- self.tty.stdread.readline()
+ # Clear buffer
+ del manager._buffer[:]
- self.assertFalse(reset.called)
- manager.enabled = False
- manager.stop()
+ manager.enabled = False
+ manager.stop()
- # No output, No changes
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), 'X\n')
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
- self.assertTrue(manager.process_exit)
+ # No output, No changes
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(), 'X\n')
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
+ self.assertTrue(manager.process_exit)
- manager.enabled = True
- manager.stop()
+ manager.enabled = True
+ manager.stop()
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
- self.assertEqual(reset.call_count, 1)
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), term.move(23, 0) + term.clear_eol +
- term.move(24, 0) + term.clear_eol + 'X\n')
- self.assertFalse(manager.process_exit)
- self.assertFalse(manager.enabled)
- for counter in manager.counters:
- self.assertFalse(counter.enabled)
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(),
+ term.move(term.height - 2, 0) + term.clear_eol +
+ term.move(term.height - 1, 0) + term.clear_eol +
+ term.normal_cursor + term.csr(0, term.height - 1) +
+ term.move(term.height, 0) + 'X\n')
+
+ self.assertFalse(manager.process_exit)
+ self.assertFalse(manager.enabled)
+ for counter in manager.counters:
+ self.assertFalse(counter.enabled)
def test_stop_no_set_scroll(self):
"""
set_scroll is False
"""
- with mock.patch('%s.reset' % TERMINAL) as reset:
- manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter,
- set_scroll=False)
- manager.counters[MockCounter(manager=manager)] = 3
- manager.counters[MockCounter(manager=manager)] = 4
- term = manager.term
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter,
+ set_scroll=False)
+ manager.counters[MockCounter(manager=manager)] = 3
+ manager.counters[MockCounter(manager=manager)] = 4
+ term = manager.term
- with mock.patch('enlighten._manager.atexit'):
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
+ with mock.patch('enlighten._manager.atexit'):
+ with mock.patch.object(term, 'change_scroll'):
+ manager._set_scroll_area()
- self.assertEqual(manager.scroll_offset, 5)
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
- self.assertTrue(manager.process_exit)
+ self.assertEqual(manager.scroll_offset, 5)
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager._stage_resize)
+ self.assertTrue(manager.process_exit)
- # Stream empty
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), 'X\n')
+ # Stream empty
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(), 'X\n')
- manager.stop()
+ manager.stop()
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
- self.assertFalse(reset.called)
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
+ self.assertFalse(manager.process_exit)
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), term.move(23, 0) + term.clear_eol +
- term.move(24, 0) + term.clear_eol + term.move(25, 0) + 'X\n')
- self.assertFalse(manager.process_exit)
+ self.tty.stdout.write(u'X\n')
+ self.assertEqual(self.tty.stdread.readline(),
+ term.move(term.height - 2, 0) + term.clear_eol +
+ term.move(term.height - 1, 0) + term.clear_eol +
+ term.move(25, 0) + 'X\n')
def test_stop_never_used(self):
"""
In this case, _set_scroll_area() was never called
"""
- with mock.patch('%s.reset' % TERMINAL) as reset:
- manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
- manager.counters[MockCounter(manager=manager)] = 3
- manager.counters[MockCounter(manager=manager)] = 4
- self.assertFalse(manager.process_exit)
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ manager.counters[MockCounter(manager=manager)] = 3
+ manager.counters[MockCounter(manager=manager)] = 4
+ term = manager.term
- manager.stop()
+ self.assertFalse(manager.process_exit)
- self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
- self.assertEqual(reset.call_count, 1)
+ manager.stop()
- # No output
+ self.assertEqual(signal.getsignal(signal.SIGWINCH), manager.sigwinch_orig)
+
+ # Only reset terminal
self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), 'X\n')
+ reset = term.normal_cursor + term.csr(0, term.height - 1) + term.move(term.height, 0)
+ self.assertEqual(self.tty.stdread.readline(), reset + 'X\n')
def test_stop_companion(self):
"""
@@ -557,13 +604,23 @@ class TestManager(TestCase):
term = manager.term
with mock.patch('enlighten._manager.atexit'):
- with mock.patch.object(term, 'change_scroll'):
- manager._set_scroll_area()
+ manager._set_scroll_area()
- with mock.patch.object(manager.companion_term, 'reset') as compReset:
+ del manager._buffer[:]
+ del manager._companion_buffer[:]
+
+ with mock.patch.object(manager, '_flush_streams'):
manager.stop()
- self.assertEqual(compReset.call_count, 1)
+ self.assertEqual(manager._buffer,
+ [term.move(term.height - 2, 0), term.clear_eol,
+ term.move(term.height - 1, 0), term.clear_eol,
+ term.normal_cursor, term.csr(0, term.height - 1),
+ term.move(term.height, 0)])
+
+ self.assertEqual(manager._companion_buffer,
+ [term.normal_cursor, term.csr(0, term.height - 1),
+ term.move(term.height, 0)])
def test_stop_position_1(self):
"""
@@ -571,19 +628,27 @@ class TestManager(TestCase):
"""
manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ term = manager.term
manager.counters[MockCounter(manager=manager)] = 3
- with mock.patch.object(manager.term, 'feed') as termfeed:
+ with mock.patch.object(manager, '_flush_streams'):
manager.stop()
- self.assertFalse(termfeed.called)
+ self.assertEqual(manager._buffer,
+ [term.normal_cursor, term.csr(0, term.height - 1),
+ term.move(term.height, 0)])
+
+ del manager._buffer[:]
manager.enabled = True
manager.counters[MockCounter(manager=manager)] = 1
- with mock.patch.object(manager.term, 'feed') as termfeed:
+ with mock.patch.object(manager, '_flush_streams'):
manager.stop()
- self.assertTrue(termfeed.called)
- def test_resize_handler(self):
+ self.assertEqual(manager._buffer,
+ [term.normal_cursor, term.csr(0, term.height - 1),
+ term.move(term.height, 0), term.cud1 or '\n'])
+
+ def test_resize(self):
"""
Resize lock must be False for handler to run
Terminal size is cached unless resize handler runs
@@ -595,12 +660,12 @@ class TestManager(TestCase):
manager.scroll_offset = 4
term = manager.term
- with mock.patch('%s.width' % TERMINAL, new_callable=mock.PropertyMock) as mockheight:
- mockheight.return_value = 70
+ with mock.patch('%s.width' % TERMINAL, new_callable=mock.PropertyMock) as mockwidth:
+ mockwidth.return_value = 70
manager.resize_lock = True
with mock.patch('enlighten._manager.Manager._set_scroll_area') as ssa:
- manager._resize_handler()
+ manager._stage_resize()
self.assertFalse(ssa.called)
self.assertEqual(manager.width, 80)
@@ -620,15 +685,63 @@ class TestManager(TestCase):
self.assertFalse(manager.resize_lock)
self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), term.move(19, 0) + term.clear_eos + 'X\n')
+ self.assertEqual(self.tty.stdread.readline(), term.move(21, 0) + term.clear_eos + 'X\n')
self.assertEqual(counter3.calls, ['refresh(flush=False, elapsed=None)'])
- def test_resize(self):
+ def test_threaded_eval(self):
"""
- Test a resize event
+ Dynamic value for threaded determined when scroll area is first set
"""
+
+ # Not dynamic if explicitly True
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter, threaded=True)
+ self.assertTrue(manager.threaded)
+ with mock.patch('threading.active_count', return_value=4):
+ manager.counter()
+ self.assertTrue(manager.threaded)
+
+ # Not dynamic if explicitly False
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter,
+ threaded=False)
+ self.assertFalse(manager.threaded)
+ with mock.patch('threading.active_count', return_value=4):
+ manager.counter()
+ self.assertFalse(manager.threaded)
+
+ # False by default
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ self.assertIsNone(manager.threaded)
+ manager.counter()
+ self.assertFalse(manager.threaded)
+
+ # True if threaded
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ self.assertIsNone(manager.threaded)
+ with mock.patch('threading.active_count', return_value=4):
+ manager.counter()
+ self.assertTrue(manager.threaded)
+
+ # True if has child processes
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ self.assertIsNone(manager.threaded)
+ with mock.patch('multiprocessing.active_children', return_value=[1, 2]):
+ manager.counter()
+ self.assertTrue(manager.threaded)
+
+ # True if is child processes
manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
+ self.assertIsNone(manager.threaded)
+ with mock.patch('multiprocessing.current_process') as c_process:
+ c_process.name = 'Process1'
+ manager.counter()
+ self.assertTrue(manager.threaded)
+
+ def test_resize_threaded(self):
+ """
+ Test a resize event threading behavior
+ """
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter, threaded=True)
counter3 = MockCounter(manager=manager)
counter3.last_update = time.time()
manager.counters[counter3] = 3
@@ -640,8 +753,8 @@ class TestManager(TestCase):
self.assertTrue(manager._resize)
self.assertEqual(counter3.last_update, 0)
- with mock.patch('%s.width' % TERMINAL, new_callable=mock.PropertyMock) as mockheight:
- mockheight.return_value = 70
+ with mock.patch('%s.width' % TERMINAL, new_callable=mock.PropertyMock) as mockwidth:
+ mockwidth.return_value = 70
# resize doesn't happen until a write is called
self.assertEqual(manager.width, 80)
@@ -651,16 +764,17 @@ class TestManager(TestCase):
self.assertEqual(ssa.call_count, 1)
self.assertEqual(manager.width, 70)
+
self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), term.move(19, 0) + term.clear_eos + 'X\n')
+ self.assertEqual(self.tty.stdread.readline(), term.move(21, 0) + term.clear_eos + 'X\n')
self.assertFalse(manager.resize_lock)
self.assertFalse(manager._resize)
self.assertEqual(counter3.calls, ['refresh(flush=False, elapsed=None)'])
- def test_resize_handler_height_only(self):
+ def test_resize_handler_height_less(self):
with mock.patch('%s.height' % TERMINAL, new_callable=mock.PropertyMock) as mockheight:
- mockheight.side_effect = [25, 23, 28, 30, 30]
+ mockheight.side_effect = [25, 23]
manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter)
counter3 = MockCounter(manager=manager)
@@ -671,11 +785,37 @@ class TestManager(TestCase):
manager._resize_handler()
self.assertEqual(ssa.call_count, 1)
- # Height is set in _set_scroll_area which is mocked
- self.assertEqual(manager.height, 25)
+ self.assertEqual(manager.height, 23)
+
+ self.assertEqual(self.tty.stdread.readline(), manager.term.move(19, 0) + '\n')
+ for _ in range(5):
+ self.assertEqual(self.tty.stdread.readline(), '\n')
+
+ self.assertEqual(counter3.calls, ['refresh(flush=False, elapsed=None)'])
+
+ def test_resize_handler_height_greater_threaded(self):
+
+ with mock.patch('%s.height' % TERMINAL, new_callable=mock.PropertyMock) as mockheight:
+ mockheight.side_effect = [25, 27]
+
+ manager = _manager.Manager(stream=self.tty.stdout, counter_class=MockCounter,
+ threaded=True)
+ counter3 = MockCounter(manager=manager)
+ manager.counters[counter3] = 3
+ manager.scroll_offset = 4
+ term = manager.term
+
+ with mock.patch('enlighten._manager.Manager._set_scroll_area') as ssa:
+ manager._resize_handler()
+ self.assertEqual(ssa.call_count, 1)
+
+ self.assertEqual(manager.height, 27)
self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(), 'X\n')
+ self.assertEqual(self.tty.stdread.readline(), term.move(27, 0) + '\n')
+ self.assertEqual(self.tty.stdread.readline(), '\n')
+ self.assertEqual(self.tty.stdread.readline(), '\n')
+ self.assertEqual(self.tty.stdread.readline(), term.move(23, 0) + term.clear_eos + 'X\n')
self.assertEqual(counter3.calls, ['refresh(flush=False, elapsed=None)'])
@@ -749,16 +889,14 @@ class TestManager(TestCase):
with mock.patch.object(_manager.signal, 'signal',
wraps=_manager.signal.signal) as mocksignal:
- with mock.patch('%s.reset' % TERMINAL):
-
- # Test no resize signal stop
- with mock.patch.object(_manager, 'RESIZE_SUPPORTED', False):
- manager.stop()
- self.assertFalse(mocksignal.called)
+ # Test no resize signal stop
+ with mock.patch.object(_manager, 'RESIZE_SUPPORTED', False):
+ manager.stop()
+ self.assertFalse(mocksignal.called)
- # Test normal case stop
- stdmgr.stop()
- self.assertTrue(mocksignal.called)
+ # Test normal case stop
+ stdmgr.stop()
+ self.assertTrue(mocksignal.called)
def test_no_resize(self):
@@ -777,8 +915,7 @@ class TestManager(TestCase):
self.assertFalse(mocksignal.called)
- with mock.patch('%s.reset' % TERMINAL):
- manager.stop()
+ manager.stop()
self.assertFalse(mocksignal.called)
=====================================
tests/test_terminal.py deleted
=====================================
@@ -1,73 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2017 - 2018 Avram Lubkin, All Rights Reserved
-
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-"""
-Test module for enlighten._terminal
-"""
-
-from enlighten import _terminal
-
-from tests import TestCase, mock, MockTTY
-
-
-# pylint: disable=missing-docstring, protected-access
-
-class TestTerminal(TestCase):
- """
- This is hard to test, so, for most tests, we'll just
- make sure the codes get passed through a tty
- """
-
- def setUp(self):
- self.tty = MockTTY()
- self.terminal = _terminal.Terminal(stream=self.tty.stdout, kind='xterm-256color')
-
- def tearDown(self):
- self.tty.close()
-
- def test_caching(self):
- """
- Make sure cached values are held.
- Return values aren't accurate for blessed, but are sufficient for this test
- """
-
- h_and_w = 'enlighten._terminal._Terminal._height_and_width'
-
- with mock.patch(h_and_w, return_value=(1, 2)):
- self.assertEqual(self.terminal._height_and_width(), (1, 2))
-
- with mock.patch(h_and_w, return_value=(5, 6)):
- self.assertEqual(self.terminal._height_and_width(), (1, 2))
- self.terminal.clear_cache()
- self.assertEqual(self.terminal._height_and_width(), (5, 6))
-
- def test_reset(self):
- self.terminal.reset()
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(),
- self.terminal.normal_cursor + self.terminal.csr(0, 24) +
- self.terminal.move(25, 0) + 'X\n')
-
- def test_feed(self):
-
- self.terminal.feed()
- self.assertEqual(self.tty.stdread.readline(), self.terminal.cud1)
-
- def test_change_scroll(self):
-
- self.terminal.change_scroll(4)
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(),
- self.terminal.hide_cursor + self.terminal.csr(0, 4) +
- self.terminal.move(4, 0) + 'X\n')
-
- def test_move_to(self):
-
- self.terminal.move_to(5, 10)
- self.tty.stdout.write(u'X\n')
- self.assertEqual(self.tty.stdread.readline(),
- self.terminal.move(10, 5) + 'X\n')
=====================================
tox.ini
=====================================
@@ -1,5 +1,5 @@
[tox]
-envlist = lint,coverage,py34,py35,py36,py37,py27,el7,pypy,pypy3,docs
+envlist = lint,coverage,py35,py36,py37,py39,py27,el7,pypy,pypy3,docs
[base]
deps =
@@ -7,6 +7,7 @@ deps =
[testenv]
usedevelop = True
+ignore_errors = True
deps =
{[base]deps}
@@ -89,3 +90,4 @@ commands=
{envpython} setup.py spelling
{envpython} setup_helpers.py spelling
{envpython} setup.py html
+ {envpython} setup_helpers.py rst2html README.rst
View it on GitLab: https://salsa.debian.org/med-team/enlighten/-/commit/38b05bf03a55d3c8959b2967c178743bc003a482
--
View it on GitLab: https://salsa.debian.org/med-team/enlighten/-/commit/38b05bf03a55d3c8959b2967c178743bc003a482
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20201230/37282048/attachment-0001.html>
More information about the debian-med-commit
mailing list