[Git][debian-gis-team/pyorbital][upstream] New upstream version 1.6.0
Antonio Valentino
gitlab at salsa.debian.org
Sun Jun 28 16:00:27 BST 2020
Antonio Valentino pushed to branch upstream at Debian GIS Project / pyorbital
Commits:
5376b354 by Antonio Valentino at 2020-06-28T10:58:35+00:00
New upstream version 1.6.0
- - - - -
17 changed files:
- .gitignore
- + .pre-commit-config.yaml
- .travis.yml
- CHANGELOG.md
- + bin/fetch_tles.py
- doc/source/index.rst
- + examples/tle.yaml
- pyorbital/astronomy.py
- pyorbital/etc/platforms.txt
- pyorbital/geoloc.py
- pyorbital/geoloc_instrument_definitions.py
- pyorbital/orbital.py
- pyorbital/tests/test_orbital.py
- pyorbital/tests/test_tlefile.py
- pyorbital/tlefile.py
- pyorbital/version.py
- setup.py
Changes:
=====================================
.gitignore
=====================================
@@ -13,7 +13,6 @@ dist
build
eggs
parts
-bin
var
sdist
develop-eggs
@@ -39,4 +38,4 @@ nosetests.xml
.pydevproject
# rope
-.ropeproject
\ No newline at end of file
+.ropeproject
=====================================
.pre-commit-config.yaml
=====================================
@@ -0,0 +1,8 @@
+exclude: '^$'
+fail_fast: false
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v2.2.3
+ hooks:
+ - id: flake8
+ additional_dependencies: [flake8-docstrings, flake8-debugger, flake8-bugbear]
=====================================
.travis.yml
=====================================
@@ -1,7 +1,7 @@
language: python
python:
-- '2.7'
-- '3.6'
+- '3.7'
+- '3.8'
install:
- pip install dask[array] xarray
- pip install .
@@ -13,6 +13,8 @@ deploy:
user: Martin.Raspaud
password:
secure: P3WiHVzDAJyZmiIfSF3PhY7Xqp3P3pSHhogla8u3KOw4Sy5Ye6IWwMX1+pupAyhdXgo8ZgGT4+wOn9dBejaLEA0RGIRLMHXd1QxP9BbPD5te/k5aTpzHILx786g5R6G4yw/8s/sftQC6lJT+0jJd2OJjQJsnNUJJTG8OC2uwq3Y=
+ distributions: sdist
+ skip_existing: true
on:
tags: true
repo: pytroll/pyorbital
=====================================
CHANGELOG.md
=====================================
@@ -1,3 +1,34 @@
+## Version 1.6.0 (2020/06/24)
+
+### Issues Closed
+
+* [Issue 52](https://github.com/pytroll/pyorbital/issues/52) - Pyorbital fails to find TLE For some satellites ([PR 53](https://github.com/pytroll/pyorbital/pull/53))
+* [Issue 28](https://github.com/pytroll/pyorbital/issues/28) - Unknown units in return value for get_alt_az ([PR 46](https://github.com/pytroll/pyorbital/pull/46))
+
+In this release 2 issues were closed.
+
+### Pull Requests Merged
+
+#### Bugs fixed
+
+* [PR 46](https://github.com/pytroll/pyorbital/pull/46) - Fix doc about get_alt_az() return units ([28](https://github.com/pytroll/pyorbital/issues/28))
+
+#### Features added
+
+* [PR 56](https://github.com/pytroll/pyorbital/pull/56) - Add a script to download TLEs and store them to a database
+* [PR 53](https://github.com/pytroll/pyorbital/pull/53) - Added active.txt tle path to TLE_URLS ([52](https://github.com/pytroll/pyorbital/issues/52))
+* [PR 50](https://github.com/pytroll/pyorbital/pull/50) - docstring fixes
+* [PR 49](https://github.com/pytroll/pyorbital/pull/49) - Equatorial Crossing Time
+* [PR 47](https://github.com/pytroll/pyorbital/pull/47) - Add support for MWHS-2 (FY-3) and skip edge-functions
+* [PR 45](https://github.com/pytroll/pyorbital/pull/45) - Adds engineering.txt TLE source ([15](https://github.com/pytroll/pyorbital/issues/15))
+
+#### Documentation changes
+
+* [PR 50](https://github.com/pytroll/pyorbital/pull/50) - docstring fixes
+
+In this release 8 pull requests were closed.
+
+
## Version 1.5.0 (2018/11/16)
### Pull Requests Merged
=====================================
bin/fetch_tles.py
=====================================
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+
+"""Script to download and store satellite TLE data."""
+
+import sys
+import logging
+import logging.config
+
+import yaml
+from pyorbital.tlefile import Downloader, SQLiteTLE
+
+
+def read_config(config_fname):
+ """Read and parse config file."""
+ with open(config_fname, 'r') as fid:
+ config = yaml.load(fid, Loader=yaml.SafeLoader)
+ return config
+
+
+def main():
+ """Run TLE downloader."""
+ config = read_config(sys.argv[1])
+ if 'logging' in config:
+ logging.config.dictConfig(config['logging'])
+ else:
+ logging.basicConfig(level=logging.INFO)
+
+ downloader = Downloader(config)
+ db = SQLiteTLE(config['database']['path'], config['platforms'],
+ config['text_writer'])
+
+ logging.info("Start downloading TLEs")
+ for dl_ in config['downloaders']:
+ fetcher = getattr(downloader, dl_)
+ tles = fetcher()
+ if isinstance(tles, dict):
+ for source in tles:
+ for tle in tles[source]:
+ db.update_db(tle, source)
+ else:
+ source = 'file'
+ if "spacetrack" in dl_:
+ source = 'spacetrack'
+ for tle in tles:
+ db.update_db(tle, source)
+
+ db.write_tle_txt()
+ db.close()
+ logging.info("TLE downloading finished")
+
+
+if __name__ == "__main__":
+ main()
=====================================
doc/source/index.rst
=====================================
@@ -27,6 +27,21 @@ Pyorbital has a module for parsing NORAD TLE-files
99.043499999999995
If no path is given pyorbital tries to read the earth observation TLE-files from celestrak.com
+
+TLE download and database
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+There is also a script, ``fetch_tles.py``, that can be used to collect
+TLE data from several locations. Then currently supported locaions
+are:
+
+* generic network locations without login
+* Space-Track (login credentials needed)
+* local files
+
+The data are saved in a SQLite3 database, and can be written to a file
+after each run. To see configuration options, see the example
+configuration in ``examples/tle.yaml``.
Computing satellite position
----------------------------
=====================================
examples/tle.yaml
=====================================
@@ -0,0 +1,63 @@
+# Settings for the TLE dataset
+database:
+ # Path to the dataset where all the data are saved.
+ path: /tmp/tle.db
+
+text_writer:
+ # Directory to save to. Created if missing.
+ output_dir: "/tmp/%Y-%m"
+ # Pattern of the filenames to write.
+ filename_pattern: "tle_%Y%m%d_%H%M.txt"
+ # Write the platform name before the TLE data. Default: False.
+ write_name: False
+ # Write the text file after every invocation. Default: False
+ write_always: False
+
+platforms:
+ # Satellite NORAD ID numbers and corresponding platform names
+ # Only IDs listed here will be added to database and saved to text files
+ 25338: NOAA-15
+ 28654: NOAA-18
+ 33591: NOAA-19
+ 37849: Suomi-NPP
+ 43013: NOAA-20
+ 29499: Metop-A
+ 38771: Metop-B
+ 43689: Metop-C
+
+downloaders:
+ fetch_plain_tle:
+ eumetsat: # This is a name used for the source in logs
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_m02_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_m01_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_m03_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_n15_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_n18_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_n19_tle.txt
+ - http://oiswww.eumetsat.int/metopTLEs/html/data_out/latest_npp_tle.txt
+ celestrak: # This is a name used for the source in logs
+ - https://www.celestrak.com/NORAD/elements/weather.txt
+ fetch_spacetrack:
+ user: <username>
+ password: <password>
+ read_tle_files:
+ # For "kickstarting" the database, local files can also be added
+ paths:
+ - /path/to/a/file/tle.txt
+ - /path/to/many/files/tle*.txt
+
+logging:
+ version: 1
+ formatters:
+ fmt:
+ format: '[%(asctime)s %(levelname)-8s %(name)s] %(message)s'
+ handlers:
+ console:
+ class: logging.StreamHandler
+ level: DEBUG
+ formatter: fmt
+ # stream: ext://sys.stdout
+ root:
+ level: DEBUG
+ propagate: false
+ handlers: [console]
=====================================
pyorbital/astronomy.py
=====================================
@@ -118,7 +118,7 @@ def _local_hour_angle(utc_time, longitude, right_ascension):
def get_alt_az(utc_time, lon, lat):
"""Return sun altitude and azimuth from *utc_time*, *lon*, and *lat*..
lon,lat in degrees
- What is the unit of the returned angles and heights!? FIXME!
+ The returned angles are given in radians.
"""
lon = np.deg2rad(lon)
lat = np.deg2rad(lat)
=====================================
pyorbital/etc/platforms.txt
=====================================
@@ -48,6 +48,16 @@ Meteosat-10 38552
Meteosat-11 40732
Metop-A 29499
Metop-B 38771
+Metop-C 43689
+TIROS-N 11060
+NOAA-6 11416
+NOAA-7 12553
+NOAA-8 13923
+NOAA-9 15427
+NOAA-10 16969
+NOAA-11 19531
+NOAA-12 21263
+NOAA-14 23455
NOAA-15 25338
NOAA-16 26536
NOAA-17 27453
@@ -56,6 +66,8 @@ NOAA-19 33591
NOAA-20 43013
RadarSat-2 32382
Sentinel-1A 39634
+Sentinel-3A 41335
+Sentinel-3B 43437
SMOS 36036
SPOT-5 27421
SPOT-6 38755
=====================================
pyorbital/geoloc.py
=====================================
@@ -1,11 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2017.
+# Copyright (c) 2011 - 2019 Pytroll Community
# Author(s):
# Martin Raspaud <martin.raspaud at smhi.se>
+# Adam Dybbroe <adam.dybbroe at smhi.se>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
=====================================
pyorbital/geoloc_instrument_definitions.py
=====================================
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2013 - 2018 PyTroll Community
+# Copyright (c) 2013 - 2019 PyTroll Community
# Author(s):
@@ -209,14 +209,14 @@ def viirs_edge_geom(scans_nb):
#
################################################################
-def amsua(scans_nb, edges_only=False):
+def amsua(scans_nb, scan_points=None):
""" Describe AMSU-A instrument geometry
Parameters:
scans_nb | int - number of scan lines
Keywords:
- * edges_only - use only edge pixels
+ * scan_points - FIXME!
Returns:
pyorbital.geoloc.ScanGeometry object
@@ -229,9 +229,7 @@ def amsua(scans_nb, edges_only=False):
sampling_interval = 0.2 # single view, seconds
sync_time = 0.00355 # delay before the actual scan starts
- if edges_only:
- scan_points = np.array([0, scan_len - 1])
- else:
+ if scan_points is None:
scan_points = np.arange(0, scan_len)
# build the instrument (scan angles)
@@ -250,18 +248,13 @@ def amsua(scans_nb, edges_only=False):
return ScanGeometry(samples, times)
-def amsua_edge_geom(scans_nb):
- # we take only edge pixels
- return amsua(scans_nb, edges_only=True)
-
-
################################################################
#
# MHS
#
################################################################
-def mhs(scans_nb, edges_only=False):
+def mhs(scans_nb, scan_points=None):
""" Describe MHS instrument geometry
See:
@@ -275,7 +268,7 @@ def mhs(scans_nb, edges_only=False):
scans_nb | int - number of scan lines
Keywords:
- * edges_only - use only edge pixels
+ * scan_points - FIXME!
Returns:
pyorbital.geoloc.ScanGeometry object
@@ -286,30 +279,32 @@ def mhs(scans_nb, edges_only=False):
scan_rate = 8 / 3. # single scan, seconds
scan_angle = -49.444 # swath, degrees
sampling_interval = (8 / 3. - 1) / 90. # single view, seconds
+ sync_time = 0.0 # delay before the actual scan starts - don't know! FIXME!
- if edges_only:
- scan_points = np.array([0, scan_len - 1])
- else:
+ if scan_points is None:
scan_points = np.arange(0, scan_len)
# build the instrument (scan angles)
- samples = np.vstack(((scan_points / (scan_len * 0.5 - 0.5) - 1)
- * np.deg2rad(scan_angle),
+ samples = np.vstack(((scan_points / (scan_len * 0.5 - 0.5) - 1) * np.deg2rad(scan_angle),
np.zeros((len(scan_points),))))
samples = np.tile(samples[:, np.newaxis, :], [1, np.int(scans_nb), 1])
# building the corresponding times array
offset = np.arange(scans_nb) * scan_rate
- times = (np.tile(scan_points * sampling_interval, [np.int(scans_nb), 1])
- + np.expand_dims(offset, 1))
+ times = (np.tile(scan_points * sampling_interval + sync_time, [np.int(scans_nb), 1]) + np.expand_dims(offset, 1))
- # build the scan geometry object
- return ScanGeometry(samples, times)
+ # scan_angles = np.linspace(-np.deg2rad(scan_angle), np.deg2rad(scan_angle), scan_len)[scan_points]
+ # samples = np.vstack((scan_angles, np.zeros(len(scan_points) * 1,)))
+ # samples = np.tile(samples[:, np.newaxis, :], [1, np.int(scans_nb), 1])
-def mhs_edge_geom(scans_nb):
- # we take only edge pixels
- return mhs(scans_nb, edges_only=True)
+ # # building the corresponding times array
+ # offset = np.arange(scans_nb) * scan_rate
+ # times = (np.tile(scan_points * sampling_interval, [np.int(scans_nb), 1])
+ # + np.expand_dims(offset, 1))
+
+ # build the scan geometry object
+ return ScanGeometry(samples, times)
################################################################
@@ -318,7 +313,7 @@ def mhs_edge_geom(scans_nb):
#
################################################################
-def hirs4(scans_nb, edges_only=False):
+def hirs4(scans_nb, scan_points=None):
"""Describe HIRS/4 instrument geometry.
See:
@@ -331,7 +326,7 @@ def hirs4(scans_nb, edges_only=False):
scans_nb | int - number of scan lines
Keywords:
- * edges_only - use only edge pixels
+ * scan_points - FIXME!
Returns:
pyorbital.geoloc.ScanGeometry object
@@ -343,9 +338,7 @@ def hirs4(scans_nb, edges_only=False):
scan_angle = -49.5 # swath, degrees
sampling_interval = abs(scan_rate) / scan_len # single view, seconds
- if edges_only:
- scan_points = np.array([0, scan_len - 1])
- else:
+ if scan_points is None:
scan_points = np.arange(0, scan_len)
# build the instrument (scan angles)
@@ -363,19 +356,14 @@ def hirs4(scans_nb, edges_only=False):
return ScanGeometry(samples, times)
-def hirs4_edge_geom(scans_nb):
- # we take only edge pixels
- return hirs4(scans_nb, edges_only=True)
-
-
################################################################
#
# ATMS
#
################################################################
-def atms(scans_nb, edges_only=False):
- """ Describe MHS instrument geometry
+def atms(scans_nb, scan_points=None):
+ """ Describe ATMS instrument geometry
See:
- https://dtcenter.org/com-GSI/users/docs/presentations/2013_workshop/
@@ -388,7 +376,7 @@ def atms(scans_nb, edges_only=False):
scans_nb | int - number of scan lines
Keywords:
- * edges_only - use only edge pixels
+ * scan_points - FIXME!
Returns:
pyorbital.geoloc.ScanGeometry object
@@ -400,9 +388,61 @@ def atms(scans_nb, edges_only=False):
scan_angle = -52.7 # swath, degrees
sampling_interval = 18e-3 # single view, seconds
- if edges_only:
- scan_points = np.array([0, scan_len - 1])
- else:
+ if scan_points is None:
+ scan_points = np.arange(0, scan_len)
+
+ # build the instrument (scan angles)
+ scan_angles = np.linspace(-np.deg2rad(scan_angle), np.deg2rad(scan_angle), scan_len)[scan_points]
+
+ samples = np.vstack((scan_angles, np.zeros(len(scan_points) * 1,)))
+ samples = np.tile(samples[:, np.newaxis, :], [1, np.int(scans_nb), 1])
+
+ # building the corresponding times array
+ offset = np.arange(scans_nb) * scan_rate
+ times = (np.tile(scan_points * sampling_interval, [np.int(scans_nb), 1])
+ + np.expand_dims(offset, 1))
+
+ # build the scan geometry object
+ return ScanGeometry(samples, times)
+
+
+################################################################
+#
+# MWHS-2
+#
+################################################################
+
+def mwhs2(scans_nb, scan_points=None):
+ """Describe MWHS-2 instrument geometry
+
+ The scanning period is 2.667 s. Main beams of the antenna scan over the ob-
+ serving swath (±53.35◦ from nadir) in the cross-track direction at a
+ constant time of 1.71 s. There are 98 pixels sampled per scan during 1.71s,
+ and each sample has the same integration period.
+
+ See:
+
+ http://english.nssc.cas.cn/rh/rp/201501/W020150122580098790190.pdf
+
+ Parameters:
+ scans_nb | int - number of scan lines
+
+ Keywords:
+ * scan_points - FIXME!
+
+ Returns:
+ pyorbital.geoloc.ScanGeometry object
+
+ """
+
+ scan_len = 98 # 98 samples per scan
+ scan_rate = 8 / 3. # single scan, seconds
+ scan_angle = -53.35 # swath, degrees
+ sampling_interval = (8 / 3. - 1) / 98. # single view, seconds
+ # sampling_interval = 17.449e-3 # single view, seconds
+ sync_time = 0.0 # delay before the actual scan starts - don't know! FIXME!
+
+ if scan_points is None:
scan_points = np.arange(0, scan_len)
# build the instrument (scan angles)
@@ -413,17 +453,25 @@ def atms(scans_nb, edges_only=False):
# building the corresponding times array
offset = np.arange(scans_nb) * scan_rate
- times = (np.tile(scan_points * sampling_interval, [np.int(scans_nb), 1])
+ times = (np.tile(scan_points * sampling_interval + sync_time,
+ [np.int(scans_nb), 1])
+ np.expand_dims(offset, 1))
+ # # build the instrument (scan angles)
+ # scan_angles = np.linspace(-np.deg2rad(scan_angle), np.deg2rad(scan_angle), scan_len)[scan_points]
+
+ # samples = np.vstack((scan_angles, np.zeros(len(scan_points) * 1,)))
+ # samples = np.tile(samples[:, np.newaxis, :], [1, np.int(scans_nb), 1])
+
+ # # building the corresponding times array
+ # offset = np.arange(scans_nb) * scan_rate
+ # times = (np.tile(scan_points * sampling_interval, [np.int(scans_nb), 1])
+ # + np.expand_dims(offset, 1))
+
# build the scan geometry object
return ScanGeometry(samples, times)
-def atms_edge_geom(scans_nb):
- # we take only edge pixels
- return atms(scans_nb, edges_only=True)
-
################################################################
#
# OLCI
=====================================
pyorbital/orbital.py
=====================================
@@ -91,12 +91,12 @@ def get_observer_look(sat_lon, sat_lat, sat_alt, utc_time, lon, lat, alt):
"""Calculate observers look angle to a satellite.
http://celestrak.com/columns/v02n02/
- utc_time: Observation time (datetime object)
- lon: Longitude of observer position on ground in degrees east
- lat: Latitude of observer position on ground in degrees north
- alt: Altitude above sea-level (geoid) of observer position on ground in km
+ :utc_time: Observation time (datetime object)
+ :lon: Longitude of observer position on ground in degrees east
+ :lat: Latitude of observer position on ground in degrees north
+ :alt: Altitude above sea-level (geoid) of observer position on ground in km
- Return: (Azimuth, Elevation)
+ :return: (Azimuth, Elevation)
"""
(pos_x, pos_y, pos_z), (vel_x, vel_y, vel_z) = astronomy.observer_position(
utc_time, sat_lon, sat_lat, sat_alt)
@@ -298,9 +298,12 @@ class Orbital(object):
return np.rad2deg(az_), np.rad2deg(el_)
- def get_orbit_number(self, utc_time, tbus_style=False):
+ def get_orbit_number(self, utc_time, tbus_style=False, as_float=False):
"""Calculate orbit number at specified time.
- Optionally use TBUS-style orbit numbering (TLE orbit number + 1)
+
+ Args:
+ tbus_style: If True, use TBUS-style orbit numbering (TLE orbit number + 1)
+ as_float: Return a continuous orbit number as float.
"""
utc_time = np.datetime64(utc_time)
try:
@@ -324,12 +327,15 @@ class Orbital(object):
dt = astronomy._days(utc_time - self.orbit_elements.an_time)
orbit_period = astronomy._days(self.orbit_elements.an_period)
- orbit = int(self.tle.orbit + dt / orbit_period +
- self.tle.mean_motion_derivative * dt**2 +
- self.tle.mean_motion_sec_derivative * dt**3)
+ orbit = self.tle.orbit + dt / orbit_period + \
+ self.tle.mean_motion_derivative * dt ** 2 + \
+ self.tle.mean_motion_sec_derivative * dt ** 3
+ if not as_float:
+ orbit = int(orbit)
if tbus_style:
orbit += 1
+
return orbit
def get_next_passes(self, utc_time, length, lon, lat, alt, tol=0.001, horizon=0):
@@ -338,15 +344,15 @@ class Orbital(object):
Original by Martin.
- utc_time: Observation time (datetime object)
- length: Number of hours to find passes (int)
- lon: Longitude of observer position on ground (float)
- lat: Latitude of observer position on ground (float)
- alt: Altitude above sea-level (geoid) of observer position on ground (float)
- tol: precision of the result in seconds
- horizon: the elevation of horizon to compute risetime and falltime.
-
- Return: [(rise-time, fall-time, max-elevation-time), ...]
+ :utc_time: Observation time (datetime object)
+ :length: Number of hours to find passes (int)
+ :lon: Longitude of observer position on ground (float)
+ :lat: Latitude of observer position on ground (float)
+ :alt: Altitude above sea-level (geoid) of observer position on ground (float)
+ :tol: precision of the result in seconds
+ :horizon: the elevation of horizon to compute risetime and falltime.
+
+ :return: [(rise-time, fall-time, max-elevation-time), ...]
"""
def elevation(minutes):
@@ -484,6 +490,74 @@ class Orbital(object):
else:
return None
+ def utc2local(self, utc_time):
+ """Convert UTC to local time."""
+ lon, _, _ = self.get_lonlatalt(utc_time)
+ return utc_time + timedelta(hours=lon * 24 / 360.0)
+
+ def get_equatorial_crossing_time(self, tstart, tend, node='ascending', local_time=False,
+ rtol=1E-9):
+ """Estimate the equatorial crossing time of an orbit.
+
+ The crossing time is determined via the orbit number, which increases by one if the
+ spacecraft passes the ascending node at the equator. A bisection algorithm is used to find
+ the time of that passage.
+
+ Args:
+ tstart: Start time of the orbit
+ tend: End time of the orbit. Orbit number at the end must be at least one greater than
+ at the start. If there are multiple revolutions in the given time interval, the
+ crossing time of the last revolution in that interval will be computed.
+ node: Specifies whether to compute the crossing time at the ascending or descending
+ node. Choices: ('ascending', 'descending').
+ local_time: By default the UTC crossing time is returned. Use this flag to convert UTC
+ to local time.
+ rtol: Tolerance of the bisection algorithm. The smaller the tolerance, the more accurate
+ the result.
+ """
+ # Determine orbit number at the start and end of the orbit.
+ n_start = self.get_orbit_number(tstart, as_float=True)
+ n_end = self.get_orbit_number(tend, as_float=True)
+ if int(n_end) - int(n_start) == 0:
+ # Orbit doesn't cross the equator in the given time interval
+ return None
+ elif n_end - n_start > 1:
+ warnings.warn('Multiple revolutions between start and end time. Computing crossing '
+ 'time for the last revolution in that interval.')
+
+ # Let n'(t) = n(t) - offset. Determine offset so that n'(tstart) < 0 and n'(tend) > 0 and
+ # n'(tcross) = 0.
+ offset = int(n_end)
+ if node == 'descending':
+ offset = offset + 0.5
+
+ # Use bisection algorithm to find the root of n'(t), which is the crossing time. The
+ # algorithm requires continuous time coordinates, so convert timestamps to microseconds
+ # since 1970.
+ time_unit = 'us' # same precision as datetime
+
+ def _nprime(time_f):
+ """Continuous orbit number as a function of time."""
+ time64 = np.datetime64(int(time_f), time_unit)
+ n = self.get_orbit_number(time64, as_float=True)
+ return n - offset
+
+ try:
+ tcross = optimize.bisect(_nprime,
+ a=np.datetime64(tstart, time_unit).astype(int),
+ b=np.datetime64(tend, time_unit).astype(int),
+ rtol=rtol)
+ except ValueError:
+ # Bisection did not converge
+ return None
+ tcross = np.datetime64(int(tcross), time_unit).astype(datetime)
+
+ # Convert UTC to local time
+ if local_time:
+ tcross = self.utc2local(tcross)
+
+ return tcross
+
class OrbitElements(object):
=====================================
pyorbital/tests/test_orbital.py
=====================================
@@ -24,6 +24,10 @@
"""
import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from datetime import datetime, timedelta
import numpy as np
@@ -151,6 +155,52 @@ class Test(unittest.TestCase):
self.assertTrue(len(res) == 15)
+ @mock.patch('pyorbital.orbital.Orbital.get_lonlatalt')
+ def test_utc2local(self, get_lonlatalt):
+ get_lonlatalt.return_value = -45, None, None
+ sat = orbital.Orbital("METOP-A",
+ line1="1 29499U 06044A 13060.48822809 "
+ ".00000017 00000-0 27793-4 0 9819",
+ line2="2 29499 98.6639 121.6164 0001449 "
+ "71.9056 43.3132 14.21510544330271")
+ self.assertEqual(sat.utc2local(datetime(2009, 7, 1, 12)),
+ datetime(2009, 7, 1, 9))
+
+ @mock.patch('pyorbital.orbital.Orbital.utc2local')
+ @mock.patch('pyorbital.orbital.Orbital.get_orbit_number')
+ def test_get_equatorial_crossing_time(self, get_orbit_number, utc2local):
+ def get_orbit_number_patched(utc_time, **kwargs):
+ utc_time = np.datetime64(utc_time)
+ diff = (utc_time - np.datetime64('2009-07-01 12:38:12')) / np.timedelta64(7200, 's')
+ return 1234 + diff
+
+ get_orbit_number.side_effect = get_orbit_number_patched
+ utc2local.return_value = 'local_time'
+ sat = orbital.Orbital("METOP-A",
+ line1="1 29499U 06044A 13060.48822809 "
+ ".00000017 00000-0 27793-4 0 9819",
+ line2="2 29499 98.6639 121.6164 0001449 "
+ "71.9056 43.3132 14.21510544330271")
+
+ # Ascending node
+ res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
+ tend=datetime(2009, 7, 1, 13))
+ exp = datetime(2009, 7, 1, 12, 38, 12)
+ self.assertTrue((res - exp) < timedelta(seconds=0.01))
+
+ # Descending node
+ res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
+ tend=datetime(2009, 7, 1, 14, 0),
+ node='descending')
+ exp = datetime(2009, 7, 1, 13, 38, 12)
+ self.assertTrue((res - exp) < timedelta(seconds=0.01))
+
+ # Conversion to local time
+ res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
+ tend=datetime(2009, 7, 1, 14),
+ local_time=True)
+ self.assertEqual(res, 'local_time')
+
class TestGetObserverLook(unittest.TestCase):
"""Test the get_observer_look function"""
=====================================
pyorbital/tests/test_tlefile.py
=====================================
@@ -1,32 +1,34 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
+#
# Copyright (c) 2014 Martin Raspaud
-
+#
# Author(s):
-
+#
# Martin Raspaud <martin.raspaud at smhi.se>
-
+# Panu Lahtinen <panu.lahtinen at fmi.fi>
+#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
-
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-
+#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""Testing TLE file reading
-"""
+"""Test TLE file reading, TLE downloading and stroging TLEs to database."""
from pyorbital.tlefile import Tle
import datetime
import unittest
+from unittest import mock
+import os
line0 = "ISS (ZARYA)"
line1 = "1 25544U 98067A 08264.51782528 -.00002182 00000-0 -11606-4 0 2927"
@@ -45,8 +47,7 @@ class TLETest(unittest.TestCase):
"""
def check_example(self, tle):
- """Check the *tle* instance against predetermined values.
- """
+ """Check the *tle* instance against predetermined values."""
# line 1
self.assertEqual(tle.satnumber, "25544")
self.assertEqual(tle.classification, "U")
@@ -74,10 +75,12 @@ class TLETest(unittest.TestCase):
self.assertEqual(tle.orbit, 56353)
def test_from_line(self):
+ """Test parsing from line elements."""
tle = Tle("ISS (ZARYA)", line1=line1, line2=line2)
self.check_example(tle)
def test_from_file(self):
+ """Test reading and parsing from a file."""
from tempfile import mkstemp
from os import write, close, remove
filehandle, filename = mkstemp()
@@ -90,11 +93,324 @@ class TLETest(unittest.TestCase):
remove(filename)
+class TestDownloader(unittest.TestCase):
+ """Test TLE downloader."""
+
+ def setUp(self):
+ """Create a downloader instance."""
+ from pyorbital.tlefile import Downloader
+ self.config = {}
+ self.dl = Downloader(self.config)
+
+ def test_init(self):
+ """Test the initialization."""
+ assert self.dl.config is self.config
+
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_plain_tle(self, requests):
+ """Test downloading and a TLE file from internet."""
+ requests.get = mock.MagicMock()
+ # The return value of requests.get()
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.text = '\n'.join((line0, line1, line2))
+ requests.get.return_value = req
+
+ # Not configured
+ self.dl.config["downloaders"] = {}
+ res = self.dl.fetch_plain_tle()
+ self.assertTrue(res == {})
+ requests.get.assert_not_called()
+
+ # Two sources, one with multiple locations
+ self.dl.config["downloaders"] = {
+ "fetch_plain_tle": {
+ "source_1": ["mocked_url_1", "mocked_url_2", "mocked_url_3"],
+ "source_2": ["mocked_url_4"]
+ }
+ }
+ res = self.dl.fetch_plain_tle()
+ self.assertTrue("source_1" in res)
+ self.assertEqual(len(res["source_1"]), 3)
+ self.assertEqual(res["source_1"][0].line1, line1)
+ self.assertEqual(res["source_1"][0].line2, line2)
+ self.assertTrue("source_2" in res)
+ self.assertEqual(len(res["source_2"]), 1)
+ self.assertTrue(mock.call("mocked_url_1") in requests.get.mock_calls)
+ self.assertEqual(len(requests.get.mock_calls), 4)
+
+ # Reset mocks
+ requests.get.reset_mock()
+ req.reset_mock()
+
+ # No data returned because the server is a teapot
+ req.status_code = 418
+ res = self.dl.fetch_plain_tle()
+ # The sources are in the dict ...
+ self.assertEqual(len(res), 2)
+ # ... but there are no TLEs
+ self.assertEqual(len(res["source_1"]), 0)
+ self.assertEqual(len(res["source_2"]), 0)
+ self.assertTrue(mock.call("mocked_url_1") in requests.get.mock_calls)
+ self.assertEqual(len(requests.get.mock_calls), 4)
+
+ @mock.patch('pyorbital.tlefile.requests')
+ def test_fetch_spacetrack(self, requests):
+ """Test downloading and TLEs from space-track.org."""
+ mock_post = mock.MagicMock()
+ mock_get = mock.MagicMock()
+ mock_session = mock.MagicMock()
+ mock_session.post = mock_post
+ mock_session.get = mock_get
+ requests.Session.return_value.__enter__.return_value = mock_session
+
+ tle_text = '\n'.join((line0, line1, line2))
+ self.dl.config["platforms"] = {
+ 25544: 'ISS'
+ }
+ self.dl.config["downloaders"] = {
+ "fetch_spacetrack": {
+ "user": "username",
+ "password": "passw0rd"
+ }
+ }
+
+ # Login fails, because the server is a teapot
+ mock_post.return_value.status_code = 418
+ res = self.dl.fetch_spacetrack()
+ # Empty list of TLEs is returned
+ self.assertTrue(res == [])
+ # The login was anyway attempted
+ mock_post.assert_called_with(
+ 'https://www.space-track.org/ajaxauth/login',
+ data={'identity': 'username', 'password': 'passw0rd'})
+
+ # Login works, but something is wrong (teapot) when asking for data
+ mock_post.return_value.status_code = 200
+ mock_get.return_value.status_code = 418
+ res = self.dl.fetch_spacetrack()
+ self.assertTrue(res == [])
+ mock_get.assert_called_with("https://www.space-track.org/"
+ "basicspacedata/query/class/tle_latest/"
+ "ORDINAL/1/NORAD_CAT_ID/25544/format/tle")
+
+ # Data is received
+ mock_get.return_value.status_code = 200
+ mock_get.return_value.text = tle_text
+ res = self.dl.fetch_spacetrack()
+ self.assertEqual(len(res), 1)
+ self.assertEqual(res[0].line1, line1)
+ self.assertEqual(res[0].line2, line2)
+
+ def test_read_tle_files(self):
+ """Test reading TLE files from a file system."""
+ from tempfile import TemporaryDirectory
+ import os
+
+ tle_text = '\n'.join((line0, line1, line2))
+
+ save_dir = TemporaryDirectory()
+ with save_dir:
+ fname = os.path.join(save_dir.name, 'tle_20200129_1600.txt')
+ with open(fname, 'w') as fid:
+ fid.write(tle_text)
+ # Add a non-existent file, it shouldn't cause a crash
+ nonexistent = os.path.join(save_dir.name, 'not_here.txt')
+ # Use a wildcard to collect files (passed to glob)
+ starred_fname = os.path.join(save_dir.name, 'tle*txt')
+ self.dl.config["downloaders"] = {
+ "read_tle_files": {
+ "paths": [fname, nonexistent, starred_fname]
+ }
+ }
+ res = self.dl.read_tle_files()
+ self.assertEqual(len(res), 2)
+ self.assertEqual(res[0].line1, line1)
+ self.assertEqual(res[0].line2, line2)
+
+ def test_parse_tles(self):
+ """Test TLE parsing."""
+ tle_text = '\n'.join((line0, line1, line2))
+
+ # Valid data
+ res = self.dl.parse_tles(tle_text)
+ self.assertEqual(len(res), 1)
+ self.assertEqual(res[0].line1, line1)
+ self.assertEqual(res[0].line2, line2)
+
+ # Only one valid line
+ res = self.dl.parse_tles(line1 + '\nbar')
+ self.assertTrue(res == [])
+
+ # Valid start of the lines, but bad data
+ res = self.dl.parse_tles('1 foo\n2 bar')
+ self.assertTrue(res == [])
+
+ # Something wrong in the data
+ bad_line2 = '2 ' + 'x' * (len(line2)-2)
+ res = self.dl.parse_tles('\n'.join((line1, bad_line2)))
+ self.assertTrue(res == [])
+
+
+class TestSQLiteTLE(unittest.TestCase):
+ """Test saving TLE data to a SQLite database."""
+
+ def setUp(self):
+ """Create a database instance."""
+ from pyorbital.tlefile import SQLiteTLE
+ from pyorbital.tlefile import Tle
+ from tempfile import TemporaryDirectory
+
+ self.temp_dir = TemporaryDirectory()
+ self.db_fname = os.path.join(self.temp_dir.name, 'tle.db')
+ self.platforms = {25544: "ISS"}
+ self.writer_config = {
+ "output_dir": os.path.join(self.temp_dir.name, 'tle_dir'),
+ "filename_pattern": "tle_%Y%m%d_%H%M%S.%f.txt",
+ "write_name": True,
+ "write_always": False
+ }
+ self.db = SQLiteTLE(self.db_fname, self.platforms, self.writer_config)
+ self.tle = Tle('ISS', line1=line1, line2=line2)
+
+ def tearDown(self):
+ """Clean temporary files."""
+ self.temp_dir.cleanup()
+
+ def test_init(self):
+ """Test that the init did what it should have."""
+ from pyorbital.tlefile import table_exists, PLATFORM_NAMES_TABLE
+
+ columns = [col.strip() for col in
+ PLATFORM_NAMES_TABLE.strip('()').split(',')]
+ num_columns = len(columns)
+
+ self.assertTrue(os.path.exists(self.db_fname))
+ self.assertTrue(table_exists(self.db.db, "platform_names"))
+ res = self.db.db.execute('select * from platform_names')
+ names = [description[0] for description in res.description]
+ self.assertEqual(len(names), num_columns)
+ for col in columns:
+ self.assertTrue(col.split(' ')[0] in names)
+
+ def test_update_db(self):
+ """Test updating database with new data."""
+ from pyorbital.tlefile import (table_exists, SATID_TABLE,
+ ISO_TIME_FORMAT)
+
+ # Get the column names
+ columns = [col.strip() for col in
+ SATID_TABLE.replace("'{}' (", "").strip(')').split(',')]
+ # Platform number
+ satid = str(list(self.platforms.keys())[0])
+
+ # Data from a platform that isn't configured
+ self.db.platforms = {}
+ self.db.update_db(self.tle, 'foo')
+ self.assertFalse(table_exists(self.db.db, satid))
+ self.assertFalse(self.db.updated)
+
+ # Configured platform
+ self.db.platforms = self.platforms
+ self.db.update_db(self.tle, 'foo')
+ self.assertTrue(table_exists(self.db.db, satid))
+ self.assertTrue(self.db.updated)
+
+ # Check that all the columns were added
+ res = self.db.db.execute("select * from '%s'" % satid)
+ names = [description[0] for description in res.description]
+ for col in columns:
+ self.assertTrue(col.split(' ')[0] in names)
+
+ # Check the data
+ data = res.fetchall()
+ self.assertEqual(len(data), 1)
+ # epoch
+ self.assertEqual(data[0][0], '2008-09-20T12:25:40.104192')
+ # TLE
+ self.assertEqual(data[0][1], '\n'.join((line1, line2)))
+ # Date when the data were added should be close to current time
+ date_added = datetime.datetime.strptime(data[0][2], ISO_TIME_FORMAT)
+ now = datetime.datetime.utcnow()
+ self.assertTrue((now - date_added).total_seconds() < 1.0)
+ # Source of the data
+ self.assertTrue(data[0][3] == 'foo')
+
+ # Try to add the same data again. Nothing should change even
+ # if the source is different if the epoch is the same
+ self.db.update_db(self.tle, 'bar')
+ res = self.db.db.execute("select * from '%s'" % satid)
+ data = res.fetchall()
+ self.assertEqual(len(data), 1)
+ date_added2 = datetime.datetime.strptime(data[0][2], ISO_TIME_FORMAT)
+ self.assertEqual(date_added, date_added2)
+ # Source of the data
+ self.assertTrue(data[0][3] == 'foo')
+
+ def test_write_tle_txt(self):
+ """Test reading data from the database and writing it to a file."""
+ import glob
+ tle_dir = self.writer_config["output_dir"]
+
+ # Put some data in the database
+ self.db.update_db(self.tle, 'foo')
+
+ # Fake that the database hasn't been updated
+ self.db.updated = False
+
+ # Try to dump the data to disk
+ self.db.write_tle_txt()
+
+ # The output dir hasn't been created
+ self.assertFalse(os.path.exists(tle_dir))
+
+ self.db.updated = True
+ self.db.write_tle_txt()
+
+ # The dir should be there
+ self.assertTrue(os.path.exists(tle_dir))
+ # There should be one file in the directory
+ files = glob.glob(os.path.join(tle_dir, 'tle_*txt'))
+ self.assertEqual(len(files), 1)
+ # The file should have been named with the date ('%' characters
+ # not there anymore)
+ self.assertTrue('%' not in files[0])
+ # The satellite name should be in the file
+ with open(files[0], 'r') as fid:
+ data = fid.read().split('\n')
+ self.assertEqual(len(data), 3)
+ self.assertTrue('ISS' in data[0])
+ self.assertEqual(data[1], line1)
+ self.assertEqual(data[2], line2)
+
+ # Call the writing again, nothing should be written. In
+ # real-life this assumes a re-run has been done without new
+ # TLE data
+ self.db.updated = False
+ self.db.write_tle_txt()
+ files = glob.glob(os.path.join(tle_dir, 'tle_*txt'))
+ self.assertEqual(len(files), 1)
+
+ # Force writing with every call
+ # Do not write the satellite name
+ self.db.writer_config["write_always"] = True
+ self.db.writer_config["write_name"] = False
+ self.db.write_tle_txt()
+ files = sorted(glob.glob(os.path.join(tle_dir, 'tle_*txt')))
+ self.assertEqual(len(files), 2)
+ with open(files[1], 'r') as fid:
+ data = fid.read().split('\n')
+ self.assertEqual(len(data), 2)
+ self.assertEqual(data[0], line1)
+ self.assertEqual(data[1], line2)
+
+
def suite():
- """The suite for test_tlefile
- """
+ """Create the test suite for test_tlefile."""
loader = unittest.TestLoader()
mysuite = unittest.TestSuite()
mysuite.addTest(loader.loadTestsFromTestCase(TLETest))
+ mysuite.addTest(loader.loadTestsFromTestCase(TestDownloader))
+ mysuite.addTest(loader.loadTestsFromTestCase(TestSQLiteTLE))
return mysuite
=====================================
pyorbital/tlefile.py
=====================================
@@ -1,31 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
+#
# Copyright (c) 2011 - 2018
-
+#
# Author(s):
-
+#
# Esben S. Nielsen <esn at dmi.dk>
# Martin Raspaud <martin.raspaud at smhi.se>
# Panu Lahtinen <panu.lahtinen at fmi.fi>
-
+# Will Evonosky <william.evonosky at gmail.com>
+#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
-
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-
+#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Classes and functions for handling TLE files."""
import io
import logging
-import datetime
+import datetime as dt
try:
from urllib2 import urlopen
except ImportError:
@@ -33,21 +35,26 @@ except ImportError:
import os
import glob
import numpy as np
+import requests
+import sqlite3
-TLE_URLS = ('http://celestrak.com/NORAD/elements/weather.txt',
+TLE_URLS = ('http://www.celestrak.com/NORAD/elements/active.txt',
+ 'http://celestrak.com/NORAD/elements/weather.txt',
'http://celestrak.com/NORAD/elements/resource.txt',
'https://www.celestrak.com/NORAD/elements/cubesat.txt',
'http://celestrak.com/NORAD/elements/stations.txt',
'https://www.celestrak.com/NORAD/elements/sarsat.txt',
'https://www.celestrak.com/NORAD/elements/noaa.txt',
- 'https://www.celestrak.com/NORAD/elements/amateur.txt')
+ 'https://www.celestrak.com/NORAD/elements/amateur.txt',
+ 'https://www.celestrak.com/NORAD/elements/engineering.txt')
+
LOGGER = logging.getLogger(__name__)
PKG_CONFIG_DIR = os.path.join(os.path.realpath(os.path.dirname(__file__)), 'etc')
def read_platform_numbers(in_upper=False, num_as_int=False):
- """Read platform numbers from $PPP_CONFIG_DIR/platforms.txt if available."""
+ """Read platform numbers from $PPP_CONFIG_DIR/platforms.txt."""
out_dict = {}
os.getenv('PPP_CONFIG_DIR', PKG_CONFIG_DIR)
platform_file = None
@@ -78,21 +85,23 @@ def read_platform_numbers(in_upper=False, num_as_int=False):
SATELLITES = read_platform_numbers(in_upper=True, num_as_int=False)
-'''
+"""
The platform numbers are given in a file $PPP_CONFIG/platforms.txt
in the following format:
.. literalinclude:: ../../etc/platforms.txt
:language: text
:lines: 4-
-'''
+"""
def read(platform, tle_file=None, line1=None, line2=None):
- """Read TLE for `platform` from `tle_file`
+ """Read TLE for *platform*.
+
+ The data are read from *tle_file*, from *line1* and *line2*, from
+ the newest file provided in the TLES pattern, or from internet if
+ none is provided.
- File is read from `line1` to `line2`, from the newest file provided in the
- TLES pattern, or from internet if none is provided.
"""
return Tle(platform, tle_file=tle_file, line1=line1, line2=line2)
@@ -107,6 +116,7 @@ def fetch(destination):
class ChecksumError(Exception):
"""ChecksumError."""
+
pass
@@ -114,6 +124,7 @@ class Tle(object):
"""Class holding TLE objects."""
def __init__(self, platform, tle_file=None, line1=None, line2=None):
+ """Init."""
self._platform = platform.strip().upper()
self._tle_file = tle_file
self._line1 = line1
@@ -160,7 +171,7 @@ class Tle(object):
return self._platform
def _checksum(self):
- """Performs the checksum for the current TLE."""
+ """Calculate checksum for the current TLE."""
for line in [self._line1, self._line2]:
check = 0
for char in line[:-1]:
@@ -226,7 +237,6 @@ class Tle(object):
def _parse_tle(self):
"""Parse values from TLE data."""
-
def _read_tle_decimal(rep):
"""Convert *rep* to decimal value."""
if rep[0] in ["-", " ", "+"]:
@@ -246,8 +256,8 @@ class Tle(object):
self.epoch_year = self._line1[18:20]
self.epoch_day = float(self._line1[20:32])
self.epoch = \
- np.datetime64(datetime.datetime.strptime(self.epoch_year, "%y") +
- datetime.timedelta(days=self.epoch_day - 1), 'us')
+ np.datetime64(dt.datetime.strptime(self.epoch_year, "%y") +
+ dt.timedelta(days=self.epoch_day - 1), 'us')
self.mean_motion_derivative = float(self._line1[33:43])
self.mean_motion_sec_derivative = _read_tle_decimal(self._line1[44:52])
self.bstar = _read_tle_decimal(self._line1[53:61])
@@ -266,6 +276,7 @@ class Tle(object):
self.orbit = int(self._line2[63:68])
def __str__(self):
+ """Format the class data for printing."""
import pprint
import sys
if sys.version_info < (3, 0):
@@ -279,8 +290,224 @@ class Tle(object):
return s_var.getvalue()[:-1]
+PLATFORM_NAMES_TABLE = "(satid text primary key, platform_name text)"
+SATID_TABLE = ("'{}' (epoch date primary key, tle text, insertion_time date,"
+ " source text)")
+SATID_VALUES = "INSERT INTO '{}' VALUES (?, ?, ?, ?)"
+PLATFORM_VALUES = "INSERT INTO platform_names VALUES (?, ?)"
+ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
+
+
+class Downloader(object):
+ """Class for downloading TLE data."""
+
+ def __init__(self, config):
+ """Init."""
+ self.config = config
+
+ def fetch_plain_tle(self):
+ """Fetch plain text-formated TLE data."""
+ tles = {}
+ if "fetch_plain_tle" in self.config["downloaders"]:
+ sources = self.config["downloaders"]["fetch_plain_tle"]
+ for source in sources:
+ tles[source] = []
+ failures = []
+ for uri in sources[source]:
+ req = requests.get(uri)
+ if req.status_code == 200:
+ tles[source] += self.parse_tles(req.text)
+ else:
+ failures.append(uri)
+ if len(failures) > 0:
+ logging.error(
+ "Could not fetch TLEs from %s, %d failure(s): [%s]",
+ source, len(failures), ', '.join(failures))
+ logging.info("Downloaded %d TLEs from %s",
+ len(tles[source]), source)
+ return tles
+
+ def fetch_spacetrack(self):
+ """Fetch TLE data from Space-Track."""
+ tles = []
+ login_url = "https://www.space-track.org/ajaxauth/login"
+ download_url = ("https://www.space-track.org/basicspacedata/query/"
+ "class/tle_latest/ORDINAL/1/NORAD_CAT_ID/%s/format/"
+ "tle")
+ download_url = download_url % ','.join(
+ [str(key) for key in self.config['platforms']])
+
+ user = self.config["downloaders"]["fetch_spacetrack"]["user"]
+ password = self.config["downloaders"]["fetch_spacetrack"]["password"]
+ credentials = {"identity": user, "password": password}
+
+ with requests.Session() as session:
+ # Login
+ req = session.post(login_url, data=credentials)
+
+ if req.status_code != 200:
+ logging.error("Could not login to Space-Track")
+ return tles
+
+ # Get the data
+ req = session.get(download_url)
+
+ if req.status_code == 200:
+ tles += self.parse_tles(req.text)
+ else:
+ logging.error("Could not retrieve TLEs from Space-Track")
+
+ logging.info("Downloaded %d TLEs from %s", len(tles), "spacetrack")
+
+ return tles
+
+ def read_tle_files(self):
+ """Read TLE data from files."""
+ paths = self.config["downloaders"]["read_tle_files"]["paths"]
+
+ # Collect filenames
+ fnames = []
+ for path in paths:
+ if '*' in path:
+ fnames += glob.glob(path)
+ else:
+ if not os.path.exists(path):
+ logging.error("File %s doesn't exist.", path)
+ continue
+ fnames += [path]
+
+ tles = []
+ for fname in fnames:
+ with open(fname, 'r') as fid:
+ data = fid.read()
+ tles += self.parse_tles(data)
+
+ logging.info("Loaded %d TLEs from local files", len(tles))
+
+ return tles
+
+ def parse_tles(self, raw_data):
+ """Parse all the TLEs in the given raw text data."""
+ tles = []
+ line1, line2 = None, None
+ raw_data = raw_data.split('\n')
+ for row in raw_data:
+ if row.startswith('1 '):
+ line1 = row
+ elif row.startswith('2 '):
+ line2 = row
+ else:
+ continue
+ if line1 is not None and line2 is not None:
+ try:
+ tle = Tle('', line1=line1, line2=line2)
+ except ValueError:
+ logging.warning(
+ "Invalid data found - line1: %s, line2: %s",
+ line1, line2)
+ else:
+ tles.append(tle)
+ line1, line2 = None, None
+ return tles
+
+
+class SQLiteTLE(object):
+ """Store TLE data in a sqlite3 database."""
+
+ def __init__(self, db_location, platforms, writer_config):
+ """Init."""
+ self.db = sqlite3.connect(db_location)
+ self.platforms = platforms
+ self.writer_config = writer_config
+ self.updated = False
+
+ # Create platform_names table if it doesn't exist
+ if not table_exists(self.db, "platform_names"):
+ cmd = "CREATE TABLE platform_names " + PLATFORM_NAMES_TABLE
+ with self.db:
+ self.db.execute(cmd)
+ logging.info("Created database table 'platform_names'")
+
+ def update_db(self, tle, source):
+ """Update the collected data.
+
+ Only data with newer epoch than the existing one is used.
+
+ """
+ num = int(tle.satnumber)
+ if num not in self.platforms:
+ return
+ tle.platform_name = self.platforms[num]
+ if not table_exists(self.db, num):
+ cmd = "CREATE TABLE " + SATID_TABLE.format(num)
+ with self.db:
+ self.db.execute(cmd)
+ logging.info("Created database table '%d'", num)
+ cmd = ""
+ with self.db:
+ self.db.execute(PLATFORM_VALUES, (num, self.platforms[num]))
+ logging.info("Added platform name '%s' for ID '%d'",
+ self.platforms[num], num)
+ cmd = SATID_VALUES.format(num)
+ epoch = tle.epoch.item().isoformat()
+ tle = '\n'.join([tle.line1, tle.line2])
+ now = dt.datetime.utcnow().isoformat()
+ try:
+ with self.db:
+ self.db.execute(cmd, (epoch, tle, now, source))
+ logging.info("Added TLE for %d (%s), epoch: %s, source: %s",
+ num, self.platforms[num], epoch, source)
+ self.updated = True
+ except sqlite3.IntegrityError:
+ pass
+
+ def write_tle_txt(self):
+ """Write TLE data to a text file."""
+ if not self.updated and not self.writer_config.get('write_always',
+ False):
+ return
+ pattern = os.path.join(self.writer_config["output_dir"],
+ self.writer_config["filename_pattern"])
+ now = dt.datetime.utcnow()
+ fname = now.strftime(pattern)
+ out_dir = os.path.dirname(fname)
+ if not os.path.exists(out_dir):
+ os.makedirs(out_dir)
+ logging.info("Created directory %s", out_dir)
+ data = []
+
+ for satid, platform_name in self.platforms.items():
+ if self.writer_config.get("write_name", False):
+ data.append(platform_name)
+ query = ("SELECT epoch, tle FROM '%s' ORDER BY "
+ "epoch DESC LIMIT 1" % satid)
+ epoch, tle = self.db.execute(query).fetchone()
+ date_epoch = dt.datetime.strptime(epoch, ISO_TIME_FORMAT)
+ tle_age = (
+ dt.datetime.utcnow() - date_epoch).total_seconds() / 3600.
+ logging.info("Latest TLE for '%s' (%s) is %d hours old.",
+ satid, platform_name, int(tle_age))
+ data.append(tle)
+
+ with open(fname, 'w') as fid:
+ fid.write('\n'.join(data))
+
+ logging.info("Wrote %d TLEs to %s", len(data), fname)
+
+ def close(self):
+ """Close the database."""
+ self.db.close()
+
+
+def table_exists(db, name):
+ """Check if the table 'name' exists in the database."""
+ name = str(name)
+ query = "SELECT 1 FROM sqlite_master WHERE type='table' and name=?"
+ return db.execute(query, (name,)).fetchone() is not None
+
+
def main():
- """Main for testing TLE reading."""
+ """Run a test TLE reading."""
tle_data = read('Noaa-19')
print(tle_data)
=====================================
pyorbital/version.py
=====================================
@@ -23,9 +23,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (tag: v1.5.0)"
- git_full = "5c8ac6fe8b94e6369d1c5d6c489d94a8f6667276"
- git_date = "2018-11-16 14:01:27 -0600"
+ git_refnames = " (HEAD -> master, tag: v1.6.0)"
+ git_full = "3ff7a6b1631deea9aed6ef227017e74f3ebdc379"
+ git_date = "2020-06-24 13:56:04 +0200"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
=====================================
setup.py
=====================================
@@ -1,25 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
-# Copyright (c) 2011-2014, 2018
-
+#
+# Copyright (c) 2011-2014, 2018, 2020
+#
# Author(s):
-
+#
# Martin Raspaud <martin.raspaud at smhi.se>
-
+# Panu Lahtinen <panu.lahtinen at fmi.fi>
+#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
-
+#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-
+#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Setup for pyorbital."""
+
import os
from setuptools import setup, find_packages
import versioneer
@@ -42,6 +45,7 @@ setup(name='pyorbital',
test_suite='pyorbital.tests.suite',
packages=find_packages(),
package_data={'pyorbital': [os.path.join('etc', 'platforms.txt')]},
+ scripts=['bin/fetch_tles.py', ],
install_requires=['numpy>=1.11.0,!=1.14.0', 'scipy'],
zip_safe=False,
)
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyorbital/-/commit/5376b35419248496ce27f9cd8b725102c39c2f7a
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/pyorbital/-/commit/5376b35419248496ce27f9cd8b725102c39c2f7a
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20200628/0103f345/attachment-0001.html>
More information about the Pkg-grass-devel
mailing list