[med-svn] [Git][med-team/gnumed-server][upstream] New upstream version 22.33
Marc Angermann (@marc_a)
gitlab at salsa.debian.org
Sun Mar 1 06:47:21 GMT 2026
Marc Angermann pushed to branch upstream at Debian Med / gnumed-server
Commits:
57cf220e by Marc Angermann at 2026-03-01T07:29:12+01:00
New upstream version 22.33
- - - - -
11 changed files:
- server/bootstrap/bootstrap_gm_db_system.py
- server/bootstrap/fixup_db-v22.conf
- server/bootstrap/update_db-v21_v22.conf
- server/doc/schema/gnumed-entire_schema.html
- + server/pycommon/_mailcap__copy.py
- server/pycommon/gmMimeLib.py
- server/pycommon/gmPG2.py
- server/pycommon/gmTools.py
- + server/sql/v21-v22/fixups/v22-gm-dbo_grants-fixup.sql
- server/sql/v21-v22/fixups/v22-release_notes-fixup.sql
- + server/sql/v21-v22/superuser/v22-gm-user_needs_md5_2_scramsha256_pwd_switch.sql
Changes:
=====================================
server/bootstrap/bootstrap_gm_db_system.py
=====================================
@@ -37,6 +37,12 @@ Requires psycopg 2.7.4 !
__author__ = "Karsten.Hilbert at gmx.net"
__license__ = "GPL v2 or later"
+
+_GM_LOGINS_GROUP = 'gm-logins'
+_GM_DBO_ROLE = 'gm-dbo'
+_PG_SUPERUSER = 'postgres'
+
+
# standard library
import sys
if sys.hexversion < 0x3000000:
@@ -215,53 +221,67 @@ CREATE INDEX %(idx_name)s ON %(idx_schema)s.%(idx_table)s(%(idx_col)s);
#==================================================================
def user_exists(cursor=None, user=None):
- cmd = "SELECT usename FROM pg_user WHERE usename = %(usr)s"
+ SQL = "SELECT usename FROM pg_user WHERE usename = %(usr)s"
args = {'usr': user}
try:
- cursor.execute(cmd, args)
+ cursor.execute(SQL, args)
except:
- _log.exception(u">>>[%s]<<< failed for user [%s]", cmd, user)
+ _log.exception(u">>>[%s]<<< failed for user [%s]", SQL, user)
return None
+
res = cursor.fetchone()
if cursor.rowcount == 1:
_log.info(u"user [%s] exists", user)
return True
+
_log.info(u"user [%s] does not exist", user)
return None
+
#------------------------------------------------------------------
-def db_group_exists(cursor=None, group=None):
- cmd = 'SELECT groname FROM pg_group WHERE groname = %(grp)s'
+def db_named_group_role_exists(cursor=None, group=None):
+ SQL = 'SELECT groname FROM pg_group WHERE groname = %(grp)s'
args = {'grp': group}
try:
- cursor.execute(cmd, args)
+ cursor.execute(SQL, args)
except:
- _log.exception(u">>>[%s]<<< failed for group [%s]", cmd, group)
+ _log.exception(">>>[%s]<<< failed for group [%s]", SQL, group)
return False
+
rows = cursor.fetchall()
- if len(rows) > 0:
- _log.info(u"group [%s] exists" % group)
+ if rows:
+ _log.info("group [%s] exists" % group)
return True
- _log.info(u"group [%s] does not exist" % group)
+
+ _log.info("group [%s] does not exist" % group)
return False
+
#------------------------------------------------------------------
-def create_db_group(cursor=None, group=None):
+def create_db_named_group_role(cursor=None, group=None):
# does this group already exist ?
- if db_group_exists(cursor, group):
+ if db_named_group_role_exists(cursor, group):
return True
- cmd = 'create group "%s"' % group
+ SQL = 'create group "%s"' % group
try:
- cursor.execute(cmd)
+ cursor.execute(SQL)
except:
- _log.exception(u">>>[%s]<<< failed for group [%s]", cmd, group)
+ _log.exception(u">>>[%s]<<< failed for group [%s]", SQL, group)
+ return False
+
+ SQL = 'GRANT "%s" to "%s" WITH ADMIN OPTION;' % (group, _GM_DBO_ROLE)
+ try:
+ cursor.execute(SQL)
+ except:
+ _log.exception(u">>>[%s]<<< failed for group [%s]", SQL, group)
return False
# paranoia is good
- if not db_group_exists(cursor, group):
+ if not db_named_group_role_exists(cursor, group):
return False
return True
+
#==================================================================
def connect(host, port, db, user, passwd, conn_name=None):
"""
@@ -308,42 +328,47 @@ class user:
raise ConstructorError("cannot get user name")
self.password = aPassword
+ if self.password is not None:
+ return None
# password not passed in, try to get it from elsewhere
+ # look into config file
+ self.password = cfg_get(self.group, "password")
+ # undefined or commented out:
+ # this means the user does not need a password
+ # but connects via IDENT or TRUST
if self.password is None:
- # look into config file
- self.password = cfg_get(self.group, "password")
- # undefined or commented out:
- # this means the user does not need a password
- # but connects via IDENT or TRUST
- if self.password is None:
- _log.info(u'password not defined, assuming connect via IDENT/TRUST')
- # defined but empty:
- # this means to ask the user if interactive
- elif self.password == '':
- if _interactive or force_interactive:
- _log.info('password for [%s] defined as "", asking user', self.name)
- print("I need the password for the database user [%s]." % self.name)
- self.password = getpass.getpass("Please type the password: ")
- _log.info('got password')
- pwd4check = None
- while pwd4check != self.password:
- _log.info('asking for confirmation')
- pwd2 = getpass.getpass("Please retype the password: ")
- if pwd2 == self.password:
- break
- _log.error('password mismatch, asking again')
- print('Password mismatch. Try again or CTRL-C to abort.')
- else:
- _log.warning('password for [%s] defined as "" (meaning <ask-user>), but running non-interactively, aborting', self.name)
- _log.warning('cannot get password for database user [%s]', self.name)
- raise ValueError('no password for user %s' % self.name)
+ _log.info(u'password not defined, assuming connect via IDENT/TRUST')
+ return None
- return None
+ if self.password != '':
+ _log.info('password taken from config file')
+ return None
+
+ # defined but empty:
+ # this means to ask the user if interactive
+ if _interactive or force_interactive:
+ _log.info('password for [%s] defined as "", asking user', self.name)
+ print("I need the password for the database user [%s]." % self.name)
+ self.password = getpass.getpass("Please type the password: ")
+ _log.info('got password')
+ pwd4check = None
+ while pwd4check != self.password:
+ _log.info('asking for confirmation')
+ pwd2 = getpass.getpass("Please retype the password: ")
+ if pwd2 == self.password:
+ break
+ _log.error('password mismatch, asking again')
+ print('Password mismatch. Try again or CTRL-C to abort.')
+ return None
+
+ _log.warning('password for [%s] defined as "" (meaning <ask-user>), but running non-interactively, aborting', self.name)
+ _log.warning('cannot get password for database user [%s]', self.name)
+ raise ValueError('no password for user %s' % self.name)
#==================================================================
class db_server:
- def __init__(self, aSrv_alias, auth_group):
+ def __init__(self, aSrv_alias, db_named_group_role):
_log.info(u"bootstrapping server [%s]" % aSrv_alias)
global _bootstrapped_servers
@@ -354,7 +379,7 @@ class db_server:
self.alias = aSrv_alias
self.section = "server %s" % self.alias
- self.auth_group = auth_group
+ self.db_named_group_role = db_named_group_role
self.conn = None
if not self.__bootstrap():
@@ -373,7 +398,7 @@ class db_server:
return None
# add users/groups
- if not self.__bootstrap_db_users():
+ if not self.__bootstrap_roles():
_log.error(u"Cannot bootstrap database users.")
return None
@@ -420,8 +445,21 @@ class db_server:
curs = self.conn.cursor()
curs.execute(u"select setting from pg_settings where name = 'lc_ctype'")
data = curs.fetchall()
- lc_ctype = data[0][0]
- _log.info(u'template database LC_CTYPE is [%s]', lc_ctype)
+ if data:
+ lc_ctype = data[0][0]
+ _log.info(u'template database LC_CTYPE is [%s]', lc_ctype)
+ else:
+ # PG17+: lc_ctype/lc_collate are per-database attrs, not GUCs
+ curs.execute("SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()")
+ row = curs.fetchone()
+ if not row:
+ _log.error('Could not read datcollate/datctype for template DB')
+ return None
+
+ lc_collate, lc_ctype = row
+ _log.info(u'template database LC_COLLATE is [%s]', lc_collate)
+ _log.info(u'template database LC_CTYPE is [%s]', lc_ctype)
+
lc_ctype = lc_ctype.lower()
if lc_ctype in ['c', 'posix']:
_log.warning('while this cluster setting allows to store databases')
@@ -448,15 +486,16 @@ class db_server:
_log.info(u"successfully connected to template database [%s]" % self.template_db)
return True
+
#--------------------------------------------------------------
# user and group related
#--------------------------------------------------------------
- def __bootstrap_db_users(self):
- _log.info(u"bootstrapping database users and groups")
+ def __bootstrap_roles(self):
+ _log.info(u"bootstrapping database roles")
# insert standard groups
if not self.__create_groups():
- _log.error(u"Cannot create GNUmed standard groups.")
+ _log.error(u"Cannot create GNUmed standard groups roles.")
return None
# create GNUmed owner
@@ -480,58 +519,67 @@ class db_server:
cursor = self.conn.cursor()
# does this user already exist ?
- name = cfg_get('user %s' % dbowner_alias, 'name')
- if user_exists(cursor, name):
- cmd = (
- 'alter group "gm-logins" add user "%s";' # postgres
- 'alter group "gm-logins" add user "%s";' # gm-dbo
- 'alter group "%s" add user "%s";'
- 'alter role "%s" createdb createrole;'
+ if user_exists(cursor, _GM_DBO_ROLE):
+ SQL = (
+ 'GRANT "%s" TO "%s";' # postgres in gm-logins (pg_dump/restore)
+ 'GRANT "%s" TO "%s" WITH ADMIN OPTION;' # gm-dbo in gm-logins; in v17 add: ", INHERIT FALSE, SET FALSE"
+ 'GRANT "%s" TO "%s" WITH ADMIN OPTION;' # gm-dbo in gnumed_vXX; in v17 add: ", INHERIT FALSE, SET FALSE"
+ 'ALTER ROLE "%s" CREATEDB CREATEROLE;'
) % (
- self.superuser.name,
- name,
- self.auth_group, name,
- name,
+ _GM_LOGINS_GROUP, _PG_SUPERUSER,
+ _GM_LOGINS_GROUP, _GM_DBO_ROLE,
+ self.db_named_group_role, _GM_DBO_ROLE,
+ _GM_DBO_ROLE
)
try:
- cursor.execute(cmd)
+ cursor.execute(SQL)
except:
- _log.error(u">>>[%s]<<< failed." % cmd)
- _log.exception(u"Cannot add GNUmed database owner [%s] to groups [gm-logins] and [%s]." % (name, self.auth_group))
+ _log.error(u">>>[%s]<<< failed." % SQL)
+ _log.exception("Cannot add GNUmed database owner [%s] to groups [%s] and [%s]." % (_GM_DBO_ROLE, _GM_LOGINS_GROUP, self.db_named_group_role))
cursor.close()
return False
+
self.conn.commit()
cursor.close()
_dbowner = user(anAlias = dbowner_alias, aPassword = 'should not matter')
return True
print_msg ((
-u"""The database owner [%s] will be created.
+"""The database owner [%s] will be created.
You will have to provide a new password for it
unless it is pre-defined in the configuration file.
Make sure to remember the password for later use !
-""") % name)
+""") % _GM_DBO_ROLE)
_dbowner = user(anAlias = dbowner_alias, force_interactive = True)
-
- cmd = 'create user "%s" with password \'%s\' createdb createrole in group "%s", "gm-logins"' % (_dbowner.name, _dbowner.password, self.auth_group)
+ SQLs = [
+ 'CREATE ROLE "%s" WITH ENCRYPTED PASSWORD \'%s\' CREATEDB CREATEROLE;' % (_GM_DBO_ROLE, _dbowner.password),
+ # gm-dbo in gm-logins; in v17 add: ", INHERIT FALSE, SET FALSE"
+ 'GRANT "%s" TO "%s" WITH ADMIN OPTION;' % (_GM_LOGINS_GROUP, _GM_DBO_ROLE),
+ # gm-dbo in gnumed_vXX; in v17 add: ", INHERIT FALSE, SET FALSE"
+ 'GRANT "%s" TO "%s" WITH ADMIN OPTION;' % (self.db_named_group_role, _GM_DBO_ROLE)
+
+ ]
+# SQL = 'CREATE ROLE "%s" WITH ENCRYPTED PASSWORD \'%s\' CREATEDB CREATEROLE IN GROUP "%s", "gm-logins"' % (_GM_DBO_ROLE, _dbowner.password, self.db_named_group_role)
try:
- cursor.execute(cmd)
+ for SQL in SQLs:
+ cursor.execute(SQL)
except:
- _log.error(u">>>[%s]<<< failed." % cmd)
- _log.exception(u"Cannot create GNUmed database owner [%s]." % _dbowner.name)
+ _log.error(">>>[%s]<<< failed.", SQL)
+ _log.exception("Cannot create GNUmed database owner [%s]." % _GM_DBO_ROLE)
cursor.close()
return None
# paranoia is good
- if not user_exists(cursor, _dbowner.name):
+ if not user_exists(cursor, _GM_DBO_ROLE):
cursor.close()
return None
self.conn.commit()
cursor.close()
return True
+
#--------------------------------------------------------------
def __create_groups(self, aSection = None):
@@ -543,19 +591,20 @@ Make sure to remember the password for later use !
groups = cfg_get(section, "groups")
if groups is None:
_log.error(u"Cannot load GNUmed group names from config file (section [%s])." % section)
- groups = [self.auth_group]
+ groups = [self.db_named_group_role]
else:
- groups.append(self.auth_group)
+ groups.append(self.db_named_group_role)
cursor = self.conn.cursor()
for group in groups:
- if not create_db_group(cursor, group):
+ if not create_db_named_group_role(cursor, group):
cursor.close()
return False
self.conn.commit()
cursor.close()
return True
+
#==================================================================
class database:
def __init__(self, aDB_alias):
@@ -604,7 +653,7 @@ class database:
raise ConstructorError("database.__init__(): Cannot bootstrap database.")
# make sure server is bootstrapped
- db_server(self.server_alias, auth_group = self.name)
+ db_server(self.server_alias, db_named_group_role = self.name)
self.server = _bootstrapped_servers[self.server_alias]
if not self.__bootstrap():
@@ -626,8 +675,6 @@ class database:
_log.error(u"Cannot load GNUmed database owner name from config file.")
return None
- self.owner = _dbowner
-
# connect as owner to template
if not self.__connect_superuser_to_template():
_log.error(u"Cannot connect to template database.")
@@ -658,7 +705,7 @@ class database:
# create authentication group
_log.info(u'creating database-specific authentication group role')
curs = self.conn.cursor()
- if not create_db_group(cursor = curs, group = self.name):
+ if not create_db_named_group_role(cursor = curs, group = self.name):
curs.close()
_log.error(u'cannot create authentication group role')
return False
@@ -667,7 +714,7 @@ class database:
# paranoia check
curs = self.conn.cursor()
- if not db_group_exists(cursor = curs, group = self.name):
+ if not db_named_group_role_exists(cursor = curs, group = self.name):
curs.close()
_log.error(u'cannot find authentication group role')
return False
@@ -813,24 +860,23 @@ class database:
self.conn.cookie = 'database.__connect_owner_to_db via database.__connect_superuser_to_db'
- _log.debug(u'setting session authorization to user %s', self.owner.name)
+ _log.debug('setting session authorization to user [%s]', _GM_DBO_ROLE)
curs = self.conn.cursor()
- cmd = "set session authorization %(usr)s"
- curs.execute(cmd, {'usr': self.owner.name})
+ SQL = "set session authorization %(usr)s"
+ curs.execute(SQL, {'usr': _GM_DBO_ROLE})
curs.close()
return self.conn and 1
#--------------------------------------------------------------
def __db_exists(self):
- #cmd = "BEGIN; SELECT datname FROM pg_database WHERE datname='%s'" % self.name
- cmd = "SELECT datname FROM pg_database WHERE datname='%s'" % self.name
+ SQL = "SELECT datname FROM pg_database WHERE datname='%s'" % self.name
aCursor = self.conn.cursor()
try:
- aCursor.execute(cmd)
+ aCursor.execute(SQL)
except:
- _log.exception(u">>>[%s]<<< failed." % cmd)
+ _log.exception(u">>>[%s]<<< failed." % SQL)
return None
res = aCursor.fetchall()
@@ -866,16 +912,16 @@ class database:
if drop_existing:
print_msg("==> dropping pre-existing target database [%s] ..." % self.name)
_log.info(u'trying to drop target database')
- cmd = 'DROP DATABASE "%s"' % self.name
+ SQL = 'DROP DATABASE "%s"' % self.name
# DROP DATABASE must be run outside transactions
self.conn.commit()
self.conn.set_session(readonly = False, autocommit = True)
cursor = self.conn.cursor()
try:
- _log.debug(u'running SQL: %s', cmd)
- cursor.execute(cmd)
+ _log.debug(u'running SQL: %s', SQL)
+ cursor.execute(SQL)
except:
- _log.exception(u">>>[%s]<<< failed" % cmd)
+ _log.exception(u">>>[%s]<<< failed" % SQL)
_log.debug(u'conn state after failed DROP: %s', gmConnectionPool.log_conn_state(self.conn))
return False
finally:
@@ -894,25 +940,25 @@ class database:
tablespace = cfg_get(self.section, 'tablespace')
if tablespace is None:
- create_db_cmd = """
+ create_db_SQL = """
CREATE DATABASE \"%s\" with
owner = \"%s\"
template = \"%s\"
encoding = 'unicode'
- ;""" % (self.name, self.owner.name, self.template_db)
+ ;""" % (self.name, _GM_DBO_ROLE, self.template_db)
else:
- create_db_cmd = """
+ create_db_SQL = """
CREATE DATABASE \"%s\" with
owner = \"%s\"
template = \"%s\"
encoding = 'unicode'
tablespace = '%s'
- ;""" % (self.name, self.owner.name, self.template_db, tablespace)
+ ;""" % (self.name, _GM_DBO_ROLE, self.template_db, tablespace)
# get size
cursor = self.conn.cursor()
- size_cmd = "SELECT pg_size_pretty(pg_database_size('%s'))" % self.template_db
- cursor.execute(size_cmd)
+ size_SQL = "SELECT pg_size_pretty(pg_database_size('%s'))" % self.template_db
+ cursor.execute(size_SQL)
size = cursor.fetchone()[0]
cursor.close()
@@ -923,9 +969,9 @@ class database:
self.conn.set_session(readonly = False, autocommit = True)
cursor = self.conn.cursor()
try:
- cursor.execute(create_db_cmd)
+ cursor.execute(create_db_SQL)
except:
- _log.exception(u">>>[%s]<<< failed" % create_db_cmd)
+ _log.exception(u">>>[%s]<<< failed" % create_db_SQL)
return False
finally:
cursor.close()
@@ -1887,6 +1933,7 @@ def main():
print("Done bootstrapping GNUmed database: We very likely succeeded.")
print('log:', gmLog2._logfile_name)
+#==================================================================
#==================================================================
if __name__ != "__main__":
print("This currently is not intended to be used as a module.")
=====================================
server/bootstrap/fixup_db-v22.conf
=====================================
@@ -69,7 +69,9 @@ $schema$
superuser schema = $superuser schema$
fixups/v22-gm-concat_table_structure_v19_and_up-fixup.sql
+fixups/v22-gm-dbo_grants-fixup.sql
superuser/v22-gm-load_auto_explain.sql
+superuser/v22-gm-user_needs_md5_2_scramsha256_pwd_switch.sql
$superuser schema$
upgrade plausibility checks = $upgrade plausibility checks$
=====================================
server/bootstrap/update_db-v21_v22.conf
=====================================
@@ -210,7 +210,9 @@ $schema$
superuser schema = $superuser schema$
fixups/v22-gm-concat_table_structure_v19_and_up-fixup.sql
+fixups/v22-gm-dbo_grants-fixup.sql
superuser/v22-gm-load_auto_explain.sql
+superuser/v22-gm-user_needs_md5_2_scramsha256_pwd_switch.sql
$superuser schema$
script base directory = ../sql/v21-v22/python/
=====================================
server/doc/schema/gnumed-entire_schema.html
=====================================
@@ -112,7 +112,7 @@
<body>
<!-- Primary Index -->
-<p><br><br>Dumped on 2025-04-20</p>
+<p><br><br>Dumped on 2025-12-29</p>
<h1><a name="index">Index of database - gnumed_v22</a></h1>
<ul>
=====================================
server/pycommon/_mailcap__copy.py
=====================================
@@ -0,0 +1,301 @@
+"""Mailcap file handling. See RFC 1524."""
+
+# 2022-10-29: Python 3.13 will remove this module
+# without replacement so we add a local copy from here:
+# https://github.com/python/cpython/blob/3.11/Lib/mailcap.py
+
+import os
+import warnings
+import re
+
+__all__ = ["getcaps","findmatch"]
+
+#_DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in '
+# 'Python {remove}. See the mimetypes module for an '
+# 'alternative.')
+#warnings._deprecated(__name__, _DEPRECATION_MSG, remove=(3, 13))
+
+def lineno_sort_key(entry):
+ # Sort in ascending order, with unspecified entries at the end
+ if 'lineno' in entry:
+ return 0, entry['lineno']
+ else:
+ return 1, 0
+
+_find_unsafe = re.compile(r'[^\xa1-\U0010FFFF\w at +=:,./-]').search
+
+class UnsafeMailcapInput(Warning):
+ """Warning raised when refusing unsafe input"""
+
+# Part 1: top-level interface.
+
+def getcaps():
+ """Return a dictionary containing the mailcap database.
+
+ The dictionary maps a MIME type (in all lowercase, e.g. 'text/plain')
+ to a list of dictionaries corresponding to mailcap entries. The list
+ collects all the entries for that MIME type from all available mailcap
+ files. Each dictionary contains key-value pairs for that MIME type,
+ where the viewing command is stored with the key "view".
+
+ """
+ caps = {}
+ lineno = 0
+ for mailcap in listmailcapfiles():
+ try:
+ fp = open(mailcap, 'r')
+ except OSError:
+ continue
+ with fp:
+ morecaps, lineno = _readmailcapfile(fp, lineno)
+ for key, value in morecaps.items():
+ if not key in caps:
+ caps[key] = value
+ else:
+ caps[key] = caps[key] + value
+ return caps
+
+def listmailcapfiles():
+ """Return a list of all mailcap files found on the system."""
+ # This is mostly a Unix thing, but we use the OS path separator anyway
+ if 'MAILCAPS' in os.environ:
+ pathstr = os.environ['MAILCAPS']
+ mailcaps = pathstr.split(os.pathsep)
+ else:
+ if 'HOME' in os.environ:
+ home = os.environ['HOME']
+ else:
+ # Don't bother with getpwuid()
+ home = '.' # Last resort
+ mailcaps = [home + '/.mailcap', '/etc/mailcap',
+ '/usr/etc/mailcap', '/usr/local/etc/mailcap']
+ return mailcaps
+
+# Part 2: the parser.
+def readmailcapfile(fp):
+ """Read a mailcap file and return a dictionary keyed by MIME type."""
+ warnings.warn('readmailcapfile is deprecated, use getcaps instead',
+ DeprecationWarning, 2)
+ caps, _ = _readmailcapfile(fp, None)
+ return caps
+
+def _readmailcapfile(fp, lineno):
+ """Read a mailcap file and return a dictionary keyed by MIME type.
+
+ Each MIME type is mapped to an entry consisting of a list of
+ dictionaries; the list will contain more than one such dictionary
+ if a given MIME type appears more than once in the mailcap file.
+ Each dictionary contains key-value pairs for that MIME type, where
+ the viewing command is stored with the key "view".
+ """
+ caps = {}
+ while 1:
+ line = fp.readline()
+ if not line: break
+ # Ignore comments and blank lines
+ if line[0] == '#' or line.strip() == '':
+ continue
+ nextline = line
+ # Join continuation lines
+ while nextline[-2:] == '\\\n':
+ nextline = fp.readline()
+ if not nextline: nextline = '\n'
+ line = line[:-2] + nextline
+ # Parse the line
+ key, fields = parseline(line)
+ if not (key and fields):
+ continue
+ if lineno is not None:
+ fields['lineno'] = lineno
+ lineno += 1
+ # Normalize the key
+ types = key.split('/')
+ for j in range(len(types)):
+ types[j] = types[j].strip()
+ key = '/'.join(types).lower()
+ # Update the database
+ if key in caps:
+ caps[key].append(fields)
+ else:
+ caps[key] = [fields]
+ return caps, lineno
+
+def parseline(line):
+ """Parse one entry in a mailcap file and return a dictionary.
+
+ The viewing command is stored as the value with the key "view",
+ and the rest of the fields produce key-value pairs in the dict.
+ """
+ fields = []
+ i, n = 0, len(line)
+ while i < n:
+ field, i = parsefield(line, i, n)
+ fields.append(field)
+ i = i+1 # Skip semicolon
+ if len(fields) < 2:
+ return None, None
+ key, view, rest = fields[0], fields[1], fields[2:]
+ fields = {'view': view}
+ for field in rest:
+ i = field.find('=')
+ if i < 0:
+ fkey = field
+ fvalue = ""
+ else:
+ fkey = field[:i].strip()
+ fvalue = field[i+1:].strip()
+ if fkey in fields:
+ # Ignore it
+ pass
+ else:
+ fields[fkey] = fvalue
+ return key, fields
+
+def parsefield(line, i, n):
+ """Separate one key-value pair in a mailcap entry."""
+ start = i
+ while i < n:
+ c = line[i]
+ if c == ';':
+ break
+ elif c == '\\':
+ i = i+2
+ else:
+ i = i+1
+ return line[start:i].strip(), i
+
+# Part 3: using the database.
+
+def findmatch(caps, MIMEtype, key='view', filename="/dev/null", plist=[]):
+ """Find a match for a mailcap entry.
+
+ Return a tuple containing the command line, and the mailcap entry
+ used; (None, None) if no match is found. This may invoke the
+ 'test' command of several matching entries before deciding which
+ entry to use.
+
+ """
+ if _find_unsafe(filename):
+ msg = "Refusing to use mailcap with filename %r. Use a safe temporary filename." % (filename,)
+ warnings.warn(msg, UnsafeMailcapInput)
+ return None, None
+ entries = lookup(caps, MIMEtype, key)
+ # XXX This code should somehow check for the needsterminal flag.
+ for e in entries:
+ if 'test' in e:
+ test = subst(e['test'], filename, plist)
+ if test is None:
+ continue
+ if test and os.system(test) != 0:
+ continue
+ command = subst(e[key], MIMEtype, filename, plist)
+ if command is not None:
+ return command, e
+ return None, None
+
+def lookup(caps, MIMEtype, key=None):
+ entries = []
+ if MIMEtype in caps:
+ entries = entries + caps[MIMEtype]
+ MIMEtypes = MIMEtype.split('/')
+ MIMEtype = MIMEtypes[0] + '/*'
+ if MIMEtype in caps:
+ entries = entries + caps[MIMEtype]
+ if key is not None:
+ entries = [e for e in entries if key in e]
+ entries = sorted(entries, key=lineno_sort_key)
+ return entries
+
+def subst(field, MIMEtype, filename, plist=[]):
+ # XXX Actually, this is Unix-specific
+ res = ''
+ i, n = 0, len(field)
+ while i < n:
+ c = field[i]; i = i+1
+ if c != '%':
+ if c == '\\':
+ c = field[i:i+1]; i = i+1
+ res = res + c
+ else:
+ c = field[i]; i = i+1
+ if c == '%':
+ res = res + c
+ elif c == 's':
+ res = res + filename
+ elif c == 't':
+ if _find_unsafe(MIMEtype):
+ msg = "Refusing to substitute MIME type %r into a shell command." % (MIMEtype,)
+ warnings.warn(msg, UnsafeMailcapInput)
+ return None
+ res = res + MIMEtype
+ elif c == '{':
+ start = i
+ while i < n and field[i] != '}':
+ i = i+1
+ name = field[start:i]
+ i = i+1
+ param = findparam(name, plist)
+ if _find_unsafe(param):
+ msg = "Refusing to substitute parameter %r (%s) into a shell command" % (param, name)
+ warnings.warn(msg, UnsafeMailcapInput)
+ return None
+ res = res + param
+ # XXX To do:
+ # %n == number of parts if type is multipart/*
+ # %F == list of alternating type and filename for parts
+ else:
+ res = res + '%' + c
+ return res
+
+def findparam(name, plist):
+ name = name.lower() + '='
+ n = len(name)
+ for p in plist:
+ if p[:n].lower() == name:
+ return p[n:]
+ return ''
+
+# Part 4: test program.
+
+def test():
+ import sys
+ caps = getcaps()
+ if not sys.argv[1:]:
+ show(caps)
+ return
+ for i in range(1, len(sys.argv), 2):
+ args = sys.argv[i:i+2]
+ if len(args) < 2:
+ print("usage: mailcap [MIMEtype file] ...")
+ return
+ MIMEtype = args[0]
+ file = args[1]
+ command, e = findmatch(caps, MIMEtype, 'view', file)
+ if not command:
+ print("No viewer found for", type)
+ else:
+ print("Executing:", command)
+ sts = os.system(command)
+ sts = os.waitstatus_to_exitcode(sts)
+ if sts:
+ print("Exit status:", sts)
+
+def show(caps):
+ print("Mailcap files:")
+ for fn in listmailcapfiles(): print("\t" + fn)
+ print()
+ if not caps: caps = getcaps()
+ print("Mailcap entries:")
+ print()
+ ckeys = sorted(caps)
+ for type in ckeys:
+ print(type)
+ entries = caps[type]
+ for e in entries:
+ keys = sorted(e)
+ for k in keys:
+ print(" %-15s" % k, e[k])
+ print()
+
+if __name__ == '__main__':
+ test()
=====================================
server/pycommon/gmMimeLib.py
=====================================
@@ -11,12 +11,15 @@ __license__ = "GPL"
# stdlib
import sys
import os
-import mailcap
import mimetypes
import subprocess
import shutil
import logging
import io
+try:
+ import mailcap as _mailcap
+except (ImportError, ModuleNotFoundError): # Python 3.11 deprecated mailcap, in 3.13 it is gone ...
+ import _mailcap__copy as _mailcap
# GNUmed
@@ -111,8 +114,8 @@ def get_viewer_cmd(aMimeType = None, aFileName = None, aToken = None):
# and hope for the best - we certainly don't want the module default "/dev/null"
aFileName = """%s"""
- mailcaps = mailcap.getcaps()
- (viewer, junk) = mailcap.findmatch(mailcaps, aMimeType, key = 'view', filename = '%s' % aFileName)
+ mailcaps = _mailcap.getcaps()
+ (viewer, junk) = _mailcap.findmatch(mailcaps, aMimeType, key = 'view', filename = '%s' % aFileName)
# FIXME: we should check for "x-token" flags
_log.debug("<%s> viewer: [%s]" % (aMimeType, viewer))
@@ -128,8 +131,8 @@ def get_editor_cmd(mimetype=None, filename=None):
# and hope for the best - we certainly don't want the module default "/dev/null"
filename = """%s"""
- mailcaps = mailcap.getcaps()
- (editor, junk) = mailcap.findmatch(mailcaps, mimetype, key = 'edit', filename = '%s' % filename)
+ mailcaps = _mailcap.getcaps()
+ (editor, junk) = _mailcap.findmatch(mailcaps, mimetype, key = 'edit', filename = '%s' % filename)
# FIXME: we should check for "x-token" flags
=====================================
server/pycommon/gmPG2.py
=====================================
@@ -661,7 +661,9 @@ def get_db_fingerprint(conn=None, fname=None, with_dump=False, eol=None):
("SELECT setting FROM pg_settings WHERE name = 'server_version'", "Version (PG)"),
("SELECT setting FROM pg_settings WHERE name = 'server_encoding'", "Encoding (PG)"),
("SELECT setting FROM pg_settings WHERE name = 'lc_collate'", "LC_COLLATE (PG)"),
+ ("SELECT datcollate FROM pg_database WHERE datname = current_database()", "pg_database.datcollate (PG)"),
("SELECT setting FROM pg_settings WHERE name = 'lc_ctype'", "LC_CTYPE (PG)"),
+ ("SELECT datctype FROM pg_database WHERE datname = current_database()", "pg_database.datctype (PG)"),
("SELECT count(1) FROM dem.identity", "Patients"),
("SELECT count(1) FROM clin.encounter", "Contacts"),
("SELECT count(1) FROM clin.episode", "Episodes"),
@@ -726,6 +728,30 @@ def get_db_fingerprint(conn=None, fname=None, with_dump=False, eol=None):
outfile.close()
return fname
+#------------------------------------------------------------------------
+def user_needs_password_encryption_switch(user:str=None) -> bool:
+ args = {'usr': user}
+ if not user:
+ SQL = 'SELECT gm.user_needs_md5_2_scramsha256_pwd_switch(CURRENT_USER)'
+ else:
+ SQL = 'SELECT gm.user_needs_md5_2_scramsha256_pwd_switch(%(usr)s)'
+ rows, idx = run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
+ if not rows[0][0]:
+ return False
+
+ _log.debug('user [%s] uses md5 password', user)
+ SQL = 'SELECT setting = %(val)s FROM pg_settings WHERE name = %(opt)s'
+ args = {
+ 'val': 'scram-sha-256',
+ 'opt': 'password_encryption'
+ }
+ rows, idx = run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
+ if not rows[0][0]:
+ return False
+
+ _log.debug('PostgreSQL cluster configured for SCRAM-SHA256 password encryption, re-encryption of user password recommended')
+ return True
+
#------------------------------------------------------------------------
def get_current_user():
rows, idx = run_ro_queries(queries = [{'cmd': 'select CURRENT_USER'}])
@@ -2258,10 +2284,10 @@ def sanity_check_database_settings(hipaa:bool=False) -> (int, str):
'fsync': [['on'], 'data loss/corruption', True],
'full_page_writes': [['on'], 'data loss/corruption', False],
'lc_messages': [['C'], 'suboptimal error detection', False],
- 'password_encryption': [['on', 'md5', 'scram-sha-256'], 'breach of confidentiality', False],
- #u'regex_flavor': [[u'advanced'], u'query breakage', False], # 9.0 doesn't support this anymore, default now advanced anyway
+ 'password_encryption': [['scram-sha-256'], 'breach of confidentiality', False],
+ #u'regex_flavor': [[u'advanced'], u'query breakage', False], # hardwired in PG9+
'synchronous_commit': [['on'], 'data loss/corruption', False],
- 'sql_inheritance': [['on'], 'query breakage, data loss/corruption', True], # IF returned (<PG10): better be ON, if NOT returned (PG10): hardwired
+ #'sql_inheritance': [['on'], 'query breakage, data loss/corruption', True], # hardwired in PG10+
'ignore_checksum_failure': [['off'], 'data loss/corruption', False], # starting with PG 9.3
'track_commit_timestamp': [['on'], 'suboptimal auditing', False], # starting with PG 9.3
}
@@ -2861,6 +2887,17 @@ SELECT to_timestamp (foofoo,'YYMMDD.HH24MI') FROM (
with open('x-pg_temp_func.txt', 'w', encoding = 'utf8') as f:
f.write(__get_schema_structure_by_pg_temp_func())
+ #--------------------------------------------------------------------
+ def test_user_needs_password_encryption_switch(user:str=None):
+ users = [
+ sys.argv[2],
+ 'any-staff',
+ None
+ ]
+ request_login_params(setup_pool = True)
+ for user in users:
+ print('user [%s]' % user, user_needs_password_encryption_switch(user))
+
#--------------------------------------------------------------------
# run tests
@@ -2890,10 +2927,11 @@ SELECT to_timestamp (foofoo,'YYMMDD.HH24MI') FROM (
#test_row_locks()
#test_faulty_SQL()
#test_log_settings()
- test_get_db_fingerprint()
+ #test_get_db_fingerprint()
#test_schema_compatible()
#test_get_schema_structure()
#test___get_schema_structure()
#test_pg_temp_concat()
+ test_user_needs_password_encryption_switch()
# ======================================================================
=====================================
server/pycommon/gmTools.py
=====================================
@@ -24,6 +24,7 @@ import functools
import json
import shutil
import zipfile
+import importlib
import datetime as pydt
import re as regex
import xml.sax.saxutils as xml_tools
@@ -138,6 +139,8 @@ u_arrow2right_until_black_diamond = '\u291e' # ->*
u_kanji_yen = '\u5186' # Yen kanji
u_replacement_character = '\ufffd'
+u_padlock_closed = '\u1f512'
+u_padlock_open = '\u1f513'
u_link_symbol = '\u1f517'
@@ -883,7 +886,8 @@ def import_module_from_directory(module_path=None, module_name=None, always_remo
module_name = module_name[:-3]
try:
- module = __import__(module_name)
+ #module = __import__(module_name)
+ module = importlib.import_module(module_name)
except Exception:
_log.exception('cannot __import__() module [%s] from [%s]' % (module_name, module_path))
while module_path in sys.path:
@@ -2203,6 +2207,9 @@ second line\n
#-----------------------------------------------------------------------
def test_unicode():
print(u_link_symbol * 10)
+ print(u_padlock_open)
+ print(u_padlock_closed)
+ print('\u1F5DD')
#-----------------------------------------------------------------------
def test_xml_escape():
print(xml_escape_string('<'))
@@ -2461,7 +2468,7 @@ second line\n
#-----------------------------------------------------------------------
#test_coalesce()
#test_capitalize()
- #test_import_module()
+ test_import_module()
#test_mkdir()
#test_gmPaths()
#test_none_if()
@@ -2478,7 +2485,7 @@ second line\n
#test_xml_escape()
#test_strip_trailing_empty_lines()
#test_fname_stem()
- test_tex_escape()
+ #test_tex_escape()
#test_rst2latex_snippet()
#test_dir_is_empty()
#test_compare_dicts()
=====================================
server/sql/v21-v22/fixups/v22-gm-dbo_grants-fixup.sql
=====================================
@@ -0,0 +1,20 @@
+-- ==============================================================
+-- GNUmed database schema change script
+--
+-- License: GPL v2 or later
+-- Author: karsten.hilbert at gmx.net
+--
+-- ==============================================================
+\set ON_ERROR_STOP 1
+--set default_transaction_read_only to off;
+
+-- --------------------------------------------------------------
+-- in v17 add: ", INHERIT FALSE, SET FALSE"
+GRANT "gnumed_v22" TO "gm-dbo" WITH ADMIN OPTION;
+GRANT "gm-logins" TO "gm-dbo" WITH ADMIN OPTION;
+GRANT "gm-doctors" TO "gm-dbo" WITH ADMIN OPTION;
+GRANT "gm-public" TO "gm-dbo" WITH ADMIN OPTION;
+GRANT "gm-staff" TO "gm-dbo" WITH ADMIN OPTION;
+
+-- --------------------------------------------------------------
+select gm.log_script_insertion('v22-gm-dbo_grants-fixup.sql', '22.32');
=====================================
server/sql/v21-v22/fixups/v22-release_notes-fixup.sql
=====================================
@@ -17,29 +17,20 @@ INSERT INTO dem.message_inbox (
) VALUES (
(select pk from dem.staff where db_user = 'any-doc'),
(select pk_type from dem.v_inbox_item_type where type = 'memo' and category = 'administrative'),
- 'Release Notes for GNUmed 1.8.21 (database v22.31)',
- 'GNUmed 1.8.21 Release Notes:
+ 'Release Notes for GNUmed 1.8.23 (database v22.33)',
+ 'GNUmed 1.8.23 Release Notes:
- 1.8.21
+ 1.8.23
-FIX: startup: crash on fingerprinting v15+ servers [thanks gm-dbo]
+FIX: hyphenated module names failing in newer Python versions [thanks María]
- 1.8.20
+IMPROVED: UI: cfg: notebook tabs position [thanks María]
+IMPROVED: DB: warn on non-SCRAM passwords
-FIX: startup: crash on fingerprinting episodes in DB if gm-staff [thanks Maria]
-FIX: patient search: gm-staff shall not ensure patient-ness [thanks Maria]
+ 22.33
- 22.31
-
-FIX: crash on fingerprinting v15+ servers [thanks gm-dbo]
-
- 22.30
-
-FIX: unique constraint on identity+name with multiple names per identity [thanks Maria]
-FIX: gm-staff permissions on dem.v_pat_addresses [thanks Maria]
-FIX: gm-staff permissions on dem.v_message_inbox [thanks Maria]
-FIX: permissions on org/unit tables/views
+FIX: boostrapping: gm-dbo cannot GRANT ... WITH ADMIN to itself
');
-- --------------------------------------------------------------
-select gm.log_script_insertion('v22-release_notes-fixup.sql', '22.31 at 1.8.21');
+select gm.log_script_insertion('v22-release_notes-fixup.sql', '22.33 at 1.8.23');
=====================================
server/sql/v21-v22/superuser/v22-gm-user_needs_md5_2_scramsha256_pwd_switch.sql
=====================================
@@ -0,0 +1,37 @@
+-- ==============================================================
+-- GNUmed database schema change script
+--
+-- License: GPL v2 or later
+-- Author: karsten.hilbert at gmx.net
+--
+-- ==============================================================
+\set ON_ERROR_STOP 1
+--set default_transaction_read_only to off;
+
+set check_function_bodies to on;
+
+-- --------------------------------------------------------------
+drop function if exists gm.user_needs_md5_2_scramsha256_pwd_switch(IN _user TEXT) cascade;
+
+create or replace function gm.user_needs_md5_2_scramsha256_pwd_switch(IN _user TEXT)
+ returns boolean
+ language 'plpgsql'
+ security definer
+ as '
+BEGIN
+ PERFORM 1 FROM pg_authid WHERE
+ rolname = _user
+ AND
+ rolpassword LIKE ''md5%'';
+ IF NOT FOUND THEN
+ RETURN FALSE;
+ END IF;
+ RAISE NOTICE ''gm.user_needs_md5_2_scramsha256_pwd_switch: account [%] needs to re-set password for encryption method switch'', _user;
+ RETURN TRUE;
+END;';
+
+COMMENT ON FUNCTION gm.user_needs_md5_2_scramsha256_pwd_switch(IN _user TEXT) IS
+'Check if a given user needs to renew the password for making the encryption method switch.';
+
+-- --------------------------------------------------------------
+select gm.log_script_insertion('v22-gm-user_needs_md5_2_scramsha256_pwd_switch.sql', '22.33');
View it on GitLab: https://salsa.debian.org/med-team/gnumed-server/-/commit/57cf220ef6b92f00190cb49b1fda74ce4f31c10d
--
View it on GitLab: https://salsa.debian.org/med-team/gnumed-server/-/commit/57cf220ef6b92f00190cb49b1fda74ce4f31c10d
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20260301/9180cb97/attachment-0001.htm>
More information about the debian-med-commit
mailing list