# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import configparser
import locale
import os
import re
import syslog
from collections import namedtuple
class WebConfigParsingError(Exception):
def __init__(self, message):
self.message = message
class WebConfigMissing(Exception):
def __init__(self, message):
self.message = message
SECHEAD = 'asection'
def load(path, case_sensitive=False, ignore_bad_encoding=False):
config = configparser.ConfigParser(allow_no_value=True,
interpolation=None,
strict=False)
if case_sensitive:
config.optionxform = str
if ignore_bad_encoding:
with open(path, 'rb') as f:
raw = f.read().decode(locale.getpreferredencoding(), 'replace')
else:
with open(path, 'r', encoding='utf-8') as f:
raw = f.read()
config.read_string(f'[{SECHEAD}]\n' + raw, source=path)
return dict(config.items(section=SECHEAD))
_QUOTES = "'", '"'
def _strip_escape_quotes_of_config_value(val: str) -> str:
"""
Strips single or double quote char only if the quote present from both sides.
"""
if val.startswith(_QUOTES) and val.endswith(_QUOTES):
return val[1:-1]
return val
def load_fast(path, delimiter="=", strip_quotes=False):
data = {}
with open(path, "r", encoding="utf-8", errors="surrogateescape") as f:
for line in f.readlines():
parts = line.split(delimiter, 1)
try:
key, value = parts
except ValueError:
# Skip broken lines
continue
value = value.strip()
value = (
_strip_escape_quotes_of_config_value(value)
if strip_quotes
else value
)
data[key.strip()] = value
return data
cache = {}
def load_once(path, ignore_errors=False):
"""
Read ini file once (cached) and return its content as dict
"""
try:
res = cache[path]
except KeyError:
try:
res = cache[path] = load(path)
except (IOError, configparser.Error):
if not ignore_errors:
raise
res = cache[path] = {}
return res
def change_settings(settings_dict, path, tmp_path=None):
if not tmp_path:
tmp_path = path + ".tmp"
used_keys = []
with (open(path, 'r', encoding='utf-8') as fin,
open(tmp_path, 'w', encoding='utf-8') as fout):
for line in fin:
stripped_line = line.strip()
if stripped_line and not stripped_line.startswith('#'):
key, _ = stripped_line.split('=', 1)
key = key.strip()
if key in settings_dict:
fout.write(f'{key}={settings_dict[key]}\n')
used_keys.append(key)
continue
fout.write(line)
with open(tmp_path, 'a', encoding='utf-8') as fout:
for key in settings_dict:
if key not in used_keys:
fout.write(f'{key}={settings_dict[key]}\n')
os.rename(tmp_path, path)
_NGINX_TOKENS_RE = re.compile(
r"""
(
# Comments
(:? \# .* $ )
# Single-, double-quoted strings and bare strings without whitespaces
| (:? "[^"\n]*?" )
| (:? '[^'\n]*?' )
| (:? [^"';\s\{\}]+ )
# Structural characters
| ;
| \{
| \}
| \n
)
""",
re.IGNORECASE | re.MULTILINE | re.VERBOSE,
)
def _ngx_tokenize(data):
tokens = (
match.group(0)
for match in _NGINX_TOKENS_RE.finditer(data)
if match and match.group(0)
)
# Explicitly ignore comments
return (tok for tok in tokens if not tok.startswith('#'))
def _ngx_take_until(it, val):
for tok in it:
if tok in val:
return
yield tok
def _ngx_take_until_block_end(it):
lvl = 1
for t in it:
if t == "{":
lvl += 1
elif t == "}":
lvl -= 1
if lvl < 1:
return
yield t
def _ngx_scan_block_info(block_tokens, need_fields):
"""Scan a block for required fields, skips nested blocks"""
info = {}
for tok in block_tokens:
# We need to skip until the end of inner block if it occurs
if tok == "{":
for _ in _ngx_take_until_block_end(block_tokens):
pass
# Now gather the value, the last occurrence is in priority
if tok in need_fields:
value_tokens = _ngx_take_until(block_tokens, ";\n")
info[tok] = list(value_tokens)
return info
def nginx_conf_loose_parser(data):
"""
Parse content of NGINX configuration in a manner tolerant to minor mistakes
and extract relevant fields from all `server` directives.
Relevant fields are:
- `server_name`
- `root` - returned as `document_root`
- `ssl` - if `listen` field contains "ssl" word
Doesn't handle interpolated values (ex. `${val}`) outside of quoted strings
"""
tokens = _ngx_tokenize(data)
for tok in tokens:
if tok != "server":
continue
# Nothing seems to be allowed between "server" directive and
# the opening of his block, so we just discard everything
# until first block opening seen
for _ in _ngx_take_until(tokens, "{"):
pass
# Limit further scan by the inside of block
block_tokens = _ngx_take_until_block_end(tokens)
# By using only `block_tokens` we ensure all blocks are properly delimited
info = _ngx_scan_block_info(block_tokens, ("server_name", "root", "listen"))
try:
server_name = info["server_name"]
root = info["root"]
except KeyError:
continue
if not server_name and not root:
continue
yield {
"server_name": _strip_escape_quotes_of_config_value(server_name[0]),
"document_root": _strip_escape_quotes_of_config_value(root[0]),
"ssl": "ssl" in info.get("listen", []),
}
def nginx_conf_parser(conf_file):
"""Parse NGINX config file, see `nginx_conf_loose_parser` for more details"""
if not os.path.isfile(conf_file):
raise WebConfigMissing(f'File does not exists {conf_file}')
dirty_data = read_unicode_file_with_decode_fallback(conf_file)
return list(nginx_conf_loose_parser(dirty_data))
def apache_conf_parser(conf_file):
if not os.path.isfile(conf_file):
raise WebConfigMissing(f'File does not exists {conf_file}')
conf_data = []
data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines()
data = [i for i in data_all if re.search('^((?!#).)*$', i)]
ID = 0
enable = False
result = {}
vhost = []
while len(data) > 0:
out = data.pop(0)
if "<VirtualHost" in out:
ip_port = out.split()[1]
port = '0'
try:
ip, port = ip_port.split(':')
port = port.replace('>', '')
except ValueError:
ip = ip_port
vhost.append(ip)
vhost.append(port)
enable = True
continue
if "</VirtualHost>" in out:
result[ID] = vhost
ID+=1
enable = False
vhost = []
continue
if enable:
vhost.append(out)
continue
for value in result.values():
# result[i][0] is an IP
# result[i][1] is a port
data = {
'user' : None,
'server_name' : '',
'document_root' : '',
'server_alias' : None,
'port' : int(value[1]),
'ssl' : False,
}
for line in value:
if "ServerName" in line:
data['server_name'] = line.split()[1].strip().replace('www.', '')
continue
if "DocumentRoot" in line:
# remove all whitespaces (first strip) and also quotes (second one)
data['document_root'] = line.split()[1].strip().strip('"')
continue
if "ServerAlias" in line:
data['server_alias'] = ','.join(str(n) for n in line.split()[1:])
continue
if "SuexecUserGroup" in line:
data['user'] = line.split()[1].strip()
if "SSLEngine" in line:
data['ssl'] = line.split()[1].strip().lower() == 'on'
conf_data.append(data)
return conf_data
PamLVECfg = namedtuple('PamLVECfg', ['min_uid', 'cagefs_enabled', 'groups'])
def parse_pam_lve_config(configfile):
"""
Parse string like:
"session required pam_lve.so 500 1 group1,group2"
:param configfile: path to config file to parse
:type configfile: str
:return: PamLVECfg instance when pam_lve configuratiom is found, None otherwise
:rtype: namedtuple
:raises: IOError, ValueError
"""
with open(configfile, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
s = line.split()
if len(s) >= 3 and s[2] == 'pam_lve.so':
# parse config string taking pam_lve defaults into account
min_uid = int(s[3]) if len(s) >= 4 else 500
cagefs_enabled = bool(int(s[4])) if len(s) >= 5 else False
groups = s[5].split(',') if len(s) >= 6 else ['wheel']
return PamLVECfg(min_uid, cagefs_enabled, groups)
# pam_lve line is not found in config file
return None
def read_unicode_file_with_decode_fallback(file_path: str) -> str:
with open(file_path, 'rb') as f:
raw_data = f.read()
try:
return raw_data.decode()
except UnicodeDecodeError:
syslog.syslog(
syslog.LOG_WARNING,
f'Failed to decode "{file_path}" content as utf-8 - loading with placeholders for invalid unicode sequences'
)
return raw_data.decode(errors='replace')