[Git][debian-gis-team/pyorbital][upstream] New upstream version 1.7.1
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Thu Dec 23 16:57:12 GMT 2021
Antonio Valentino pushed to branch upstream at Debian GIS Project / pyorbital
1fa9649a by Antonio Valentino at 2021-12-23T16:13:00+00:00
New upstream version 1.7.1
- - - - -
13 changed files:
- .github/workflows/ci.yaml
- .stickler.yml
- doc/source/index.rst
- examples/tle.yaml
- pyorbital/etc/platforms.txt
- pyorbital/orbital.py
- pyorbital/tests/test_orbital.py
- pyorbital/tests/test_tlefile.py
- pyorbital/tlefile.py
- pyorbital/version.py
@@ -9,7 +9,7 @@ jobs:
fail-fast: true
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
- python-version: ["3.7", "3.8", "3.9"]
+ python-version: ["3.8", "3.9", "3.10"]
PYTHON_VERSION: ${{ matrix.python-version }}
@@ -1,6 +1,4 @@
- max-line-length: 120
- fixer: true
- enable: true
+ python: 3
+ config: setup.cfg
@@ -1,3 +1,43 @@
+## Version 1.7.1 (2021/12/22)
+### Pull Requests Merged
+#### Bugs fixed
+* [PR 92](https://github.com/pytroll/pyorbital/pull/92) - Fix bogus designator assignment
+In this release 1 pull request was closed.
+## Version 1.7.0 (2021/12/20)
+### Issues Closed
+* [Issue 90](https://github.com/pytroll/pyorbital/issues/90) - get_observer_look raises IndexError on numpy 1.21.4
+* [Issue 85](https://github.com/pytroll/pyorbital/issues/85) - Azimuth/elevation output not changing with 1-second increments
+* [Issue 79](https://github.com/pytroll/pyorbital/issues/79) - ModuleNotFoundError: 'pyorbital' is not a package
+* [Issue 72](https://github.com/pytroll/pyorbital/issues/72) - Unexpected Nans in get_observer_look_no_tle
+* [Issue 38](https://github.com/pytroll/pyorbital/issues/38) - Issue with pyorbital.planets
+In this release 5 issues were closed.
+### Pull Requests Merged
+#### Features added
+* [PR 91](https://github.com/pytroll/pyorbital/pull/91) - Add get_observer_look test for scalar case and update stickler config ([91](https://github.com/pytroll/pyorbital/issues/91))
+* [PR 89](https://github.com/pytroll/pyorbital/pull/89) - Change tested Python versions to 3.8, 3.9 and 3.10
+* [PR 83](https://github.com/pytroll/pyorbital/pull/83) - Add Sentinel-5P to platform_names
+* [PR 78](https://github.com/pytroll/pyorbital/pull/78) - Add parser to read TLEs from Multi Mission Administrative Messages
+#### Documentation changes
+* [PR 82](https://github.com/pytroll/pyorbital/pull/82) - Add historical TLE files link
+In this release 5 pull requests were closed.
## Version 1.6.1 (2021/04/12)
### Issues Closed
@@ -1,8 +1,8 @@
@@ -1,6 +1,6 @@
# Releasing Pyorbital
-1. checkout master
+1. checkout main branch
2. pull from repo
3. run the unittests
4. run `loghub` and update the `CHANGELOG.md` file:
@@ -31,6 +31,9 @@ If no path is given pyorbital tries to read the earth observation TLE-files from
TLE download and database
+The historical TLE files can be requested from
+`celestrak <https://celestrak.com/NORAD/archives/request.php>`_.
There is also a script, ``fetch_tles.py``, that can be used to collect
TLE data from several locations. Then currently supported locaions
@@ -42,7 +45,7 @@ are:
The data are saved in a SQLite3 database, and can be written to a file
after each run. To see configuration options, see the example
configuration in ``examples/tle.yaml``.
Computing satellite position
The orbital module enables computation of satellite position and velocity at a specific time:
@@ -59,7 +62,7 @@ The orbital module enables computation of satellite position and velocity at a s
>>> # Get longitude, latitude and altitude of the satellite:
>>> orb.get_lonlatalt(now)
(40.374855865574951, 78.849923885700363, 839.62504115338368)
Use actual TLEs to increase accuracy
@@ -40,6 +40,11 @@ downloaders:
user: <username>
password: <password>
+ read_xml_admin_messages:
+ # Direct reception data from Metop satellites have Eumetsat admin messages in them. This can be used
+ # to read the TLEs from the XML variants
+ paths:
+ - /path/to/xml/admin/messages/*ADMIN_MESSAGE*xml
# For "kickstarting" the database, local files can also be added
@@ -69,6 +69,7 @@ RadarSat-2 32382
Sentinel-1A 39634
Sentinel-3A 41335
Sentinel-3B 43437
+Sentinel-5P 42969
SMOS 36036
SPOT-5 27421
SPOT-6 38755
@@ -126,27 +126,17 @@ def get_observer_look(sat_lon, sat_lat, sat_alt, utc_time, lon, lat, alt):
top_z = cos_lat * cos_theta * rx + \
cos_lat * sin_theta * ry + sin_lat * rz
- az_ = np.arctan(-top_e / top_s)
+ # Azimuth is undefined when elevation is 90 degrees, 180 (pi) will be returned.
+ az_ = np.arctan2(-top_e, top_s) + np.pi
+ az_ = np.mod(az_, 2 * np.pi) # Needed on some platforms
- if has_xarray and isinstance(az_, xr.DataArray):
- az_data = az_.data
- else:
- az_data = az_
- if has_dask and isinstance(az_data, da.Array):
- az_data = da.where(top_s > 0, az_data + np.pi, az_data)
- az_data = da.where(az_data < 0, az_data + 2 * np.pi, az_data)
- else:
- az_data[np.where(top_s > 0)] += np.pi
- az_data[np.where(az_data < 0)] += 2 * np.pi
+ rg_ = np.sqrt(rx * rx + ry * ry + rz * rz)
- if has_xarray and isinstance(az_, xr.DataArray):
- az_.data = az_data
- else:
- az_ = az_data
+ top_z_divided_by_rg_ = top_z / rg_
- rg_ = np.sqrt(rx * rx + ry * ry + rz * rz)
- el_ = np.arcsin(top_z / rg_)
+ # Due to rounding top_z can be larger than rg_ (when el_ ~ 90).
+ top_z_divided_by_rg_ = top_z_divided_by_rg_.clip(max=1)
+ el_ = np.arcsin(top_z_divided_by_rg_)
return np.rad2deg(az_), np.rad2deg(el_)
@@ -219,16 +219,20 @@ class TestGetObserverLook(unittest.TestCase):
def setUp(self):
self.t = datetime(2018, 1, 1, 0, 0, 0)
- self.sat_lon = np.array([[-89.5, -89.4], [-89.3, -89.2]])
- self.sat_lat = np.array([[45.5, 45.4], [45.3, 45.2]])
- self.sat_alt = np.array([[35786, 35786], [35786, 35786]])
- self.lon = np.array([[-85.5, -85.4], [-85.3, -85.2]])
- self.lat = np.array([[40.5, 40.4], [40.3, 40.2]])
- self.alt = np.zeros((2, 2))
- self.exp_azi = np.array([[331.00275902, 330.95954165],
- [330.91642994, 330.87342384]])
- self.exp_elev = np.array([[83.18070976, 83.17788976],
- [83.17507167, 83.1722555]])
+ self.sat_lon = np.array([[-89.5, -89.4, -89.5, -89.4],
+ [-89.3, -89.2, -89.3, -89.2]])
+ self.sat_lat = np.array([[45.5, 45.4, 45.5, 45.4],
+ [45.3, 40.2, 45.3, 40.2]])
+ self.sat_alt = 35786 * np.ones((2, 4))
+ self.lon = np.array([[-85.5, -85.4, -89.5, -99.4],
+ [-85.3, -89.2, -89.3, -79.2]])
+ self.lat = np.array([[40.5, 40.4, 65.5, 45.4],
+ [40.3, 40.2, 25.3, 40.2]])
+ self.alt = np.zeros((2, 4))
+ self.exp_azi = np.array([[331.00275902, 330.95954165, 180, 86.435411],
+ [330.91642994, 180, 0, 273.232073]])
+ self.exp_elev = np.array([[83.18070976, 83.17788976, 66.548467, 81.735221],
+ [83.17507167, 90, 66.559906, 81.010018]])
def test_basic_numpy(self):
"""Test with numpy array inputs"""
@@ -294,6 +298,103 @@ class TestGetObserverLook(unittest.TestCase):
np.testing.assert_allclose(azi.data.compute(), self.exp_azi)
np.testing.assert_allclose(elev.data.compute(), self.exp_elev)
+ def test_scalar(self):
+ """Test with scalar inputs."""
+ from pyorbital.orbital import get_observer_look
+ (azi, elev) = get_observer_look(0, 0, 30_000_000, self.t, 0, 0, 0)
+ np.testing.assert_allclose(elev, 90)
+class TestGetObserverLookNadir(unittest.TestCase):
+ """Test the get_observer_look function when satellite is at nadir."""
+ def setUp(self):
+ """Setup for test observer at nadir.
+ Note that rounding error differs between array types.
+ With 1000 elements a test gives:
+ 1 error for basic numpy
+ 41 errors for basic dask
+ 63 errors for xarray with dask
+ 2 error for xarray with numpy
+ """
+ rng = np.random.RandomState(125)
+ self.t = datetime(2018, 1, 1, 0, 0, 0)
+ self.sat_lon = 360 * rng.rand(100) - 180
+ self.sat_lat = 180 * rng.rand(100) - 90
+ self.sat_alt = rng.rand(100) + 850
+ self.lon = self.sat_lon # + 10E-17
+ self.lat = self.sat_lat # + 10E-17
+ self.alt = np.zeros((100))
+ self.exp_elev = np.zeros((100)) + 90
+ def test_basic_numpy(self):
+ """Test with numpy array inputs"""
+ from pyorbital import orbital
+ azi, elev = orbital.get_observer_look(self.sat_lon, self.sat_lat,
+ self.sat_alt, self.t,
+ self.lon, self.lat, self.alt)
+ self.assertEqual(np.sum(np.isnan(azi)), 0)
+ self.assertFalse(np.isnan(azi).any())
+ np.testing.assert_allclose(elev, self.exp_elev)
+ def test_basic_dask(self):
+ """Test with dask array inputs"""
+ from pyorbital import orbital
+ import dask.array as da
+ sat_lon = da.from_array(self.sat_lon, chunks=2)
+ sat_lat = da.from_array(self.sat_lat, chunks=2)
+ sat_alt = da.from_array(self.sat_alt, chunks=2)
+ lon = da.from_array(self.lon, chunks=2)
+ lat = da.from_array(self.lat, chunks=2)
+ alt = da.from_array(self.alt, chunks=2)
+ azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
+ sat_alt, self.t,
+ lon, lat, alt)
+ self.assertEqual(np.sum(np.isnan(azi)), 0)
+ self.assertFalse(np.isnan(azi).any())
+ np.testing.assert_allclose(elev.compute(), self.exp_elev)
+ def test_xarray_with_numpy(self):
+ """Test with xarray DataArray with numpy array as inputs"""
+ from pyorbital import orbital
+ import xarray as xr
+ def _xarr_conv(input):
+ return xr.DataArray(input)
+ sat_lon = _xarr_conv(self.sat_lon)
+ sat_lat = _xarr_conv(self.sat_lat)
+ sat_alt = _xarr_conv(self.sat_alt)
+ lon = _xarr_conv(self.lon)
+ lat = _xarr_conv(self.lat)
+ alt = _xarr_conv(self.alt)
+ azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
+ sat_alt, self.t,
+ lon, lat, alt)
+ self.assertEqual(np.sum(np.isnan(azi)), 0)
+ self.assertFalse(np.isnan(azi).any())
+ np.testing.assert_allclose(elev.data, self.exp_elev)
+ def test_xarray_with_dask(self):
+ """Test with xarray DataArray with dask array as inputs"""
+ from pyorbital import orbital
+ import dask.array as da
+ import xarray as xr
+ def _xarr_conv(input):
+ return xr.DataArray(da.from_array(input, chunks=2))
+ sat_lon = _xarr_conv(self.sat_lon)
+ sat_lat = _xarr_conv(self.sat_lat)
+ sat_alt = _xarr_conv(self.sat_alt)
+ lon = _xarr_conv(self.lon)
+ lat = _xarr_conv(self.lat)
+ alt = _xarr_conv(self.alt)
+ azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
+ sat_alt, self.t,
+ lon, lat, alt)
+ self.assertEqual(np.sum(np.isnan(azi)), 0)
+ self.assertFalse(np.isnan(azi).any())
+ np.testing.assert_allclose(elev.data.compute(), self.exp_elev)
class TestRegressions(unittest.TestCase):
"""Test regressions."""
@@ -318,6 +419,7 @@ def suite():
mysuite = unittest.TestSuite()
+ mysuite.addTest(loader.loadTestsFromTestCase(TestGetObserverLookNadir))
return mysuite
@@ -35,6 +35,36 @@ line0 = "ISS (ZARYA)"
line1 = "1 25544U 98067A 08264.51782528 -.00002182 00000-0 -11606-4 0 2927"
line2 = "2 25544 51.6416 247.4627 0006703 130.5360 325.0288 15.72125391563537"
+line1_2 = "1 38771U 12049A 21137.30264622 .00000000 00000+0 -49996-5 0 00017"
+line2_2 = "2 38771 98.7162 197.7716 0002383 106.1049 122.6344 14.21477797449453"
+NOAA19_2LINES = """1 33591U 09005A 21355.91138073 .00000074 00000+0 65091-4 0 9998
+2 33591 99.1688 21.1338 0013414 329.8936 30.1462 14.12516400663123
+NOAA19_3LINES = "NOAA 19\n" + NOAA19_2LINES
+tle_xml = '\n'.join(
+ ('<?xml version="1.0" encoding="UTF-8"?>',
+ '<multi-mission-administrative-message>',
+ '<message>',
+ '<two-line-elements>',
+ '<navigation>',
+ '<line-1>' + line1 + '</line-1>',
+ '<line-2>' + line2 + '</line-2>',
+ '</navigation>',
+ '</two-line-elements>',
+ '</message>',
+ '<message>',
+ '<two-line-elements>',
+ '<navigation>',
+ '<line-1>' + line1_2 + '</line-1>',
+ '<line-2>' + line2_2 + '</line-2>',
+ '</navigation>',
+ '</two-line-elements>',
+ '</message>',
+ '</multi-mission-administrative-message>'))
class TLETest(unittest.TestCase):
"""Test TLE reading.
@@ -93,6 +123,58 @@ class TLETest(unittest.TestCase):
+ def test_from_file_with_hyphenated_platform_name(self):
+ """Test reading and parsing from a file with a slightly different name."""
+ from tempfile import mkstemp
+ from os import write, close, remove
+ filehandle, filename = mkstemp()
+ try:
+ write(filehandle, NOAA19_3LINES.encode('utf-8'))
+ close(filehandle)
+ tle = Tle("NOAA-19", filename)
+ assert tle.satnumber == "33591"
+ finally:
+ remove(filename)
+ def test_from_file_with_no_platform_name(self):
+ """Test reading and parsing from a file with a slightly different name."""
+ from tempfile import mkstemp
+ from os import write, close, remove
+ filehandle, filename = mkstemp()
+ try:
+ write(filehandle, NOAA19_2LINES.encode('utf-8'))
+ close(filehandle)
+ tle = Tle("NOAA-19", filename)
+ assert tle.satnumber == "33591"
+ finally:
+ remove(filename)
+ def test_from_mmam_xml(self):
+ """Test reading from an MMAM XML file."""
+ from tempfile import TemporaryDirectory
+ save_dir = TemporaryDirectory()
+ with save_dir:
+ fname = os.path.join(save_dir.name, '20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml')
+ with open(fname, 'w') as fid:
+ fid.write(tle_xml)
+ tle = Tle("", tle_file=fname)
+ self.check_example(tle)
+ "fetch_plain_tle": {
+ "source_1": ["mocked_url_1", "mocked_url_2", "mocked_url_3"],
+ "source_2": ["mocked_url_4"]
+ }
+ "fetch_spacetrack": {
+ "user": "username",
+ "password": "passw0rd"
+ }
class TestDownloader(unittest.TestCase):
"""Test TLE downloader."""
@@ -108,14 +190,10 @@ class TestDownloader(unittest.TestCase):
assert self.dl.config is self.config
- def test_fetch_plain_tle(self, requests):
+ def test_fetch_plain_tle_not_configured(self, requests):
"""Test downloading and a TLE file from internet."""
requests.get = mock.MagicMock()
- # The return value of requests.get()
- req = mock.MagicMock()
- req.status_code = 200
- req.text = '\n'.join((line0, line1, line2))
- requests.get.return_value = req
+ requests.get.return_value = _get_req_response(200)
# Not configured
self.dl.config["downloaders"] = {}
@@ -123,13 +201,15 @@ class TestDownloader(unittest.TestCase):
self.assertTrue(res == {})
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_plain_tle_two_sources(self, requests):
+ """Test downloading and a TLE file from internet."""
+ requests.get = mock.MagicMock()
+ requests.get.return_value = _get_req_response(200)
# Two sources, one with multiple locations
- self.dl.config["downloaders"] = {
- "fetch_plain_tle": {
- "source_1": ["mocked_url_1", "mocked_url_2", "mocked_url_3"],
- "source_2": ["mocked_url_4"]
- }
- }
+ self.dl.config["downloaders"] = FETCH_PLAIN_TLE_CONFIG
res = self.dl.fetch_plain_tle()
self.assertTrue("source_1" in res)
self.assertEqual(len(res["source_1"]), 3)
@@ -140,12 +220,16 @@ class TestDownloader(unittest.TestCase):
self.assertTrue(mock.call("mocked_url_1") in requests.get.mock_calls)
self.assertEqual(len(requests.get.mock_calls), 4)
- # Reset mocks
- requests.get.reset_mock()
- req.reset_mock()
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_plain_tle_server_is_a_teapot(self, requests):
+ """Test downloading and a TLE file from internet."""
+ requests.get = mock.MagicMock()
# No data returned because the server is a teapot
- req.status_code = 418
+ requests.get.return_value = _get_req_response(418)
+ # Two sources, one with multiple locations
+ self.dl.config["downloaders"] = FETCH_PLAIN_TLE_CONFIG
res = self.dl.fetch_plain_tle()
# The sources are in the dict ...
self.assertEqual(len(res), 2)
@@ -156,25 +240,17 @@ class TestDownloader(unittest.TestCase):
self.assertEqual(len(requests.get.mock_calls), 4)
- def test_fetch_spacetrack(self, requests):
+ def test_fetch_spacetrack_login_fails(self, requests):
"""Test downloading and TLEs from space-track.org."""
mock_post = mock.MagicMock()
- mock_get = mock.MagicMock()
mock_session = mock.MagicMock()
mock_session.post = mock_post
- mock_session.get = mock_get
requests.Session.return_value.__enter__.return_value = mock_session
- tle_text = '\n'.join((line0, line1, line2))
self.dl.config["platforms"] = {
25544: 'ISS'
- self.dl.config["downloaders"] = {
- "fetch_spacetrack": {
- "user": "username",
- "password": "passw0rd"
- }
- }
+ self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
# Login fails, because the server is a teapot
mock_post.return_value.status_code = 418
@@ -186,6 +262,21 @@ class TestDownloader(unittest.TestCase):
data={'identity': 'username', 'password': 'passw0rd'})
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_spacetrack_get_fails(self, requests):
+ """Test downloading and TLEs from space-track.org."""
+ mock_post = mock.MagicMock()
+ mock_get = mock.MagicMock()
+ mock_session = mock.MagicMock()
+ mock_session.post = mock_post
+ mock_session.get = mock_get
+ requests.Session.return_value.__enter__.return_value = mock_session
+ self.dl.config["platforms"] = {
+ 25544: 'ISS'
+ }
+ self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
# Login works, but something is wrong (teapot) when asking for data
mock_post.return_value.status_code = 200
mock_get.return_value.status_code = 418
@@ -195,7 +286,24 @@ class TestDownloader(unittest.TestCase):
- # Data is received
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_spacetrack_success(self, requests):
+ """Test downloading and TLEs from space-track.org."""
+ mock_post = mock.MagicMock()
+ mock_get = mock.MagicMock()
+ mock_session = mock.MagicMock()
+ mock_session.post = mock_post
+ mock_session.get = mock_get
+ requests.Session.return_value.__enter__.return_value = mock_session
+ tle_text = '\n'.join((line0, line1, line2))
+ self.dl.config["platforms"] = {
+ 25544: 'ISS'
+ }
+ self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
+ # Login works and data is received
+ mock_post.return_value.status_code = 200
mock_get.return_value.status_code = 200
mock_get.return_value.text = tle_text
res = self.dl.fetch_spacetrack()
@@ -206,7 +314,6 @@ class TestDownloader(unittest.TestCase):
def test_read_tle_files(self):
"""Test reading TLE files from a file system."""
from tempfile import TemporaryDirectory
- import os
tle_text = '\n'.join((line0, line1, line2))
@@ -229,28 +336,40 @@ class TestDownloader(unittest.TestCase):
self.assertEqual(res[0].line1, line1)
self.assertEqual(res[0].line2, line2)
- def test_parse_tles(self):
- """Test TLE parsing."""
- tle_text = '\n'.join((line0, line1, line2))
+ def test_read_xml_admin_messages(self):
+ """Test reading TLE files from a file system."""
+ from tempfile import TemporaryDirectory
- # Valid data
- res = self.dl.parse_tles(tle_text)
- self.assertEqual(len(res), 1)
+ save_dir = TemporaryDirectory()
+ with save_dir:
+ fname = os.path.join(save_dir.name, '20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml')
+ with open(fname, 'w') as fid:
+ fid.write(tle_xml)
+ # Add a non-existent file, it shouldn't cause a crash
+ nonexistent = os.path.join(save_dir.name, 'not_here.txt')
+ # Use a wildcard to collect files (passed to glob)
+ starred_fname = os.path.join(save_dir.name, '*.xml')
+ self.dl.config["downloaders"] = {
+ "read_xml_admin_messages": {
+ "paths": [fname, nonexistent, starred_fname]
+ }
+ }
+ res = self.dl.read_xml_admin_messages()
+ # There are two sets of TLEs in the file. And as the same file is
+ # parsed twice, 4 TLE objects are returned
+ self.assertEqual(len(res), 4)
self.assertEqual(res[0].line1, line1)
self.assertEqual(res[0].line2, line2)
+ self.assertEqual(res[1].line1, line1_2)
+ self.assertEqual(res[1].line2, line2_2)
- # Only one valid line
- res = self.dl.parse_tles(line1 + '\nbar')
- self.assertTrue(res == [])
- # Valid start of the lines, but bad data
- res = self.dl.parse_tles('1 foo\n2 bar')
- self.assertTrue(res == [])
- # Something wrong in the data
- bad_line2 = '2 ' + 'x' * (len(line2)-2)
- res = self.dl.parse_tles('\n'.join((line1, bad_line2)))
- self.assertTrue(res == [])
+def _get_req_response(code):
+ req = mock.MagicMock()
+ req.status_code = code
+ req.text = '\n'.join((line0, line1, line2))
+ return req
class TestSQLiteTLE(unittest.TestCase):
@@ -28,15 +28,15 @@
import io
import logging
import datetime as dt
- from urllib2 import urlopen
-except ImportError:
- from urllib.request import urlopen
+from urllib.request import urlopen
import os
import glob
import numpy as np
import requests
import sqlite3
+from xml.etree import ElementTree as ET
+from itertools import zip_longest
TLE_URLS = ('http://www.celestrak.com/NORAD/elements/active.txt',
@@ -74,11 +74,14 @@ def read_platform_numbers(in_upper=False, num_as_int=False):
parts = row.split()
if len(parts) < 2:
+ # The satellite name might have whitespace
+ platform = ' '.join(parts[:-1])
+ num = parts[-1]
if in_upper:
- parts[0] = parts[0].upper()
+ platform = platform.upper()
if num_as_int:
- parts[1] = int(parts[1])
- out_dict[parts[0]] = parts[1]
+ num = int(num)
+ out_dict[platform] = num
return out_dict
@@ -95,6 +98,10 @@ in the following format:
+def _dummy_open_stringio(stream):
+ return stream
def read(platform, tle_file=None, line1=None, line2=None):
"""Read TLE for *platform*.
@@ -117,8 +124,6 @@ def fetch(destination):
class ChecksumError(Exception):
- pass
class Tle(object):
"""Class holding TLE objects."""
@@ -188,47 +193,8 @@ class Tle(object):
if self._line1 is not None and self._line2 is not None:
tle = self._line1.strip() + "\n" + self._line2.strip()
- def _open(filename):
- return io.open(filename, 'rb')
- if self._tle_file:
- urls = (self._tle_file,)
- open_func = _open
- elif "TLES" in os.environ:
- # TODO: get the TLE file closest in time to the actual satellite
- # overpass, NOT the latest!
- urls = (max(glob.glob(os.environ["TLES"]),
- key=os.path.getctime), )
- LOGGER.debug("Reading TLE from %s", urls[0])
- open_func = _open
- else:
- LOGGER.debug("Fetch TLE from the internet.")
- urls = TLE_URLS
- open_func = urlopen
- tle = ""
- designator = "1 " + SATELLITES.get(self._platform, '')
- for url in urls:
- fid = open_func(url)
- for l_0 in fid:
- l_0 = l_0.decode('utf-8')
- if l_0.strip() == self._platform:
- l_1 = next(fid).decode('utf-8')
- l_2 = next(fid).decode('utf-8')
- tle = l_1.strip() + "\n" + l_2.strip()
- break
- if(self._platform in SATELLITES and
- l_0.strip().startswith(designator)):
- l_1 = l_0
- l_2 = next(fid).decode('utf-8')
- tle = l_1.strip() + "\n" + l_2.strip()
- LOGGER.debug("Found platform %s, ID: %s",
- self._platform,
- SATELLITES[self._platform])
- break
- fid.close()
- if tle:
- break
+ uris, open_func = _get_uris_and_open_func(tle_file=self._tle_file)
+ tle = _get_first_tle(uris, open_func, platform=self._platform)
if not tle:
raise KeyError("Found no TLE entry for '%s'" % self._platform)
@@ -278,18 +244,83 @@ class Tle(object):
def __str__(self):
"""Format the class data for printing."""
import pprint
- import sys
- if sys.version_info < (3, 0):
- from StringIO import StringIO
- else:
- from io import StringIO
- s_var = StringIO()
+ s_var = io.StringIO()
d_var = dict(([(k, v) for k, v in
list(self.__dict__.items()) if k[0] != '_']))
pprint.pprint(d_var, s_var)
return s_var.getvalue()[:-1]
+def _get_uris_and_open_func(tle_file=None):
+ def _open(filename):
+ return io.open(filename, 'rb')
+ if tle_file:
+ if isinstance(tle_file, io.StringIO):
+ uris = (tle_file,)
+ open_func = _dummy_open_stringio
+ elif "ADMIN_MESSAGE" in tle_file:
+ uris = (io.StringIO(read_tle_from_mmam_xml_file(tle_file)),)
+ open_func = _dummy_open_stringio
+ else:
+ uris = (tle_file,)
+ open_func = _open
+ elif "TLES" in os.environ:
+ # TODO: get the TLE file closest in time to the actual satellite
+ # overpass, NOT the latest!
+ uris = (max(glob.glob(os.environ["TLES"]),
+ key=os.path.getctime), )
+ LOGGER.debug("Reading TLE from %s", uris[0])
+ open_func = _open
+ else:
+ LOGGER.debug("Fetch TLE from the internet.")
+ uris = TLE_URLS
+ open_func = urlopen
+ return uris, open_func
+def _get_first_tle(uris, open_func, platform=''):
+ return _get_tles_from_uris(uris, open_func, platform=platform, only_first=True)
+def _get_tles_from_uris(uris, open_func, platform='', only_first=True):
+ tles = []
+ designator = "1 " + SATELLITES.get(platform, '')
+ for url in uris:
+ fid = open_func(url)
+ for l_0 in fid:
+ tle = ""
+ l_0 = _decode(l_0)
+ if l_0.strip() == platform:
+ l_1 = _decode(next(fid))
+ l_2 = _decode(next(fid))
+ tle = l_1.strip() + "\n" + l_2.strip()
+ elif (platform in SATELLITES or not only_first) and l_0.strip().startswith(designator):
+ l_1 = l_0
+ l_2 = _decode(next(fid))
+ tle = l_1.strip() + "\n" + l_2.strip()
+ if platform:
+ LOGGER.debug("Found platform %s, ID: %s", platform, SATELLITES[platform])
+ elif open_func == _dummy_open_stringio and l_0.startswith(designator):
+ l_1 = l_0
+ l_2 = _decode(next(fid))
+ tle = l_1.strip() + "\n" + l_2.strip()
+ if tle:
+ if only_first:
+ return tle
+ tles.append(tle)
+ if only_first:
+ return ""
+ return tles
+def _decode(itm):
+ if isinstance(itm, str):
+ return itm
+ return itm.decode('utf-8')
PLATFORM_NAMES_TABLE = "(satid text primary key, platform_name text)"
SATID_TABLE = ("'{}' (epoch date primary key, tle text, insertion_time date,"
" source text)")
@@ -316,7 +347,7 @@ class Downloader(object):
for uri in sources[source]:
req = requests.get(uri)
if req.status_code == 200:
- tles[source] += self.parse_tles(req.text)
+ tles[source] += _parse_tles_for_downloader((req.text,), io.StringIO)
if len(failures) > 0:
@@ -353,7 +384,7 @@ class Downloader(object):
req = session.get(download_url)
if req.status_code == 200:
- tles += self.parse_tles(req.text)
+ tles = _parse_tles_for_downloader((req.text,), io.StringIO)
logging.error("Could not retrieve TLEs from Space-Track")
@@ -366,49 +397,68 @@ class Downloader(object):
paths = self.config["downloaders"]["read_tle_files"]["paths"]
# Collect filenames
- fnames = []
- for path in paths:
- if '*' in path:
- fnames += glob.glob(path)
- else:
- if not os.path.exists(path):
- logging.error("File %s doesn't exist.", path)
- continue
- fnames += [path]
+ fnames = collect_filenames(paths)
+ tles = _parse_tles_for_downloader(fnames, open)
+ logging.info("Loaded %d TLEs from local files", len(tles))
- tles = []
- for fname in fnames:
- with open(fname, 'r') as fid:
- data = fid.read()
- tles += self.parse_tles(data)
+ return tles
- logging.info("Loaded %d TLEs from local files", len(tles))
+ def read_xml_admin_messages(self):
+ """Read Eumetsat admin messages in XML format."""
+ paths = self.config["downloaders"]["read_xml_admin_messages"]["paths"]
+ tles = read_tles_from_mmam_xml_files(paths)
+ logging.info("Loaded %d TLEs from admin message XML files", len(tles))
return tles
- def parse_tles(self, raw_data):
- """Parse all the TLEs in the given raw text data."""
- tles = []
- line1, line2 = None, None
- raw_data = raw_data.split('\n')
- for row in raw_data:
- if row.startswith('1 '):
- line1 = row
- elif row.startswith('2 '):
- line2 = row
- else:
+def _parse_tles_for_downloader(item, open_func):
+ return [Tle('', tle_file=io.StringIO(tle)) for tle in
+ _get_tles_from_uris(item, open_func, platform='', only_first=False)]
+def collect_filenames(paths):
+ """Collect all filenames from *paths*."""
+ fnames = []
+ for path in paths:
+ if '*' in path:
+ fnames += glob.glob(path)
+ else:
+ if not os.path.exists(path):
+ logging.error("File %s doesn't exist.", path)
- if line1 is not None and line2 is not None:
- try:
- tle = Tle('', line1=line1, line2=line2)
- except ValueError:
- logging.warning(
- "Invalid data found - line1: %s, line2: %s",
- line1, line2)
- else:
- tles.append(tle)
- line1, line2 = None, None
- return tles
+ fnames += [path]
+ return fnames
+def read_tles_from_mmam_xml_files(paths):
+ # Collect filenames
+ fnames = collect_filenames(paths)
+ tles = []
+ for fname in fnames:
+ data = read_tle_from_mmam_xml_file(fname).split('\n')
+ for two_lines in _group_iterable_to_chunks(2, data):
+ tl_stream = io.StringIO('\n'.join(two_lines))
+ tles.append(Tle('', tle_file=tl_stream))
+ return tles
+def read_tle_from_mmam_xml_file(fname):
+ tree = ET.parse(fname)
+ root = tree.getroot()
+ data = []
+ for nav in root.findall(".//navigation"):
+ data.append(nav.find(".//line-1").text)
+ data.append(nav.find(".//line-2").text)
+ return "\n".join(data)
+def _group_iterable_to_chunks(n, iterable, fillvalue=None):
+ "Collect data into fixed-length chunks or blocks"
+ # _group_iterable_to_chunks(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
+ args = [iter(iterable)] * n
+ return zip_longest(fillvalue=fillvalue, *args)
class SQLiteTLE(object):
@@ -23,9 +23,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (tag: v1.6.1)"
- git_full = "6b7e685766c268de35f5b42688e066a81418bd73"
- git_date = "2021-04-12 11:48:04 +0200"
+ git_refnames = " (HEAD -> main, tag: v1.7.1)"
+ git_full = "1bf73c834befb6d7f060c636dae85bb91230a556"
+ git_date = "2021-12-22 11:02:02 +0100"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyorbital/-/commit/1fa9649ae7265e1f7297a6651d5305190d12691c
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyorbital/-/commit/1fa9649ae7265e1f7297a6651d5305190d12691c
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20211223/ff6ff26e/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list