# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import os
import re
import subprocess
import sys
CAGEFS_MP_FILENAME = '/etc/cagefs/cagefs.mp'
CAGEFSCTL_TOOL = '/usr/sbin/cagefsctl'
class CagefsMpConflict(Exception):
def __init__(self, new_item, existing_item):
self._msg = (
f"Conflict in adding '{new_item}' to {CAGEFS_MP_FILENAME} "
f"because of pre-existing alternative specification: '{existing_item}'"
)
def __str__(self):
return self._msg
class CagefsNotSupportedError(Exception):
"""Cagefs Not Supported Exception"""
def __init__(self, message):
Exception.__init__(self, message)
class CagefsMpItem:
PREFIX_LIST = '@!%*'
_PREFIX_MOUNT_RW = ''
_PREFIX_MOUNT_RO = '!'
def __init__(self, arg):
"""Constructor
:param arg: Is either path to add to cagefs.mp or a raw line is read from cagefs.mp
:param prefix: The same as adding prefix '!' to arg before passing it to ctor"""
if arg[:1] == '#': # is a comment? then init as dummy
self._path_spec = None
elif arg.strip() == '': # init as dummy for empty lines
self._path_spec = None
else:
self._path_spec = arg
def mode(self, mode):
"""Specify mode as in fluent constructor"""
if self.prefix() == '@' and mode is not None:
self._path_spec = f"{self._path_spec},{mode:03o}"
return self
def __str__(self):
return self._path_spec
@staticmethod
def _add_slash(path):
if path == '':
return '/'
if path[-1] != '/':
return path + '/'
return path
def pre_exist_in(self, another):
adopted = CagefsMpItem._adopt(another)
# overkill: just to keep strictly to comparing NULL objects principle
if self.is_dummy() or adopted.is_dummy():
return False
this_path = CagefsMpItem._add_slash(self.path())
test_preexist_in_path = CagefsMpItem._add_slash(adopted.path())
return this_path.startswith(test_preexist_in_path)
def is_compatible_by_prefix_with(self, existing):
adopted = CagefsMpItem._adopt(existing)
# overkill: just to keep strictly to comparing NULL objects principle
if self.is_dummy() or adopted.is_dummy():
return False
if self.prefix() == adopted.prefix():
return True
prefix_compatibility_map = { CagefsMpItem._PREFIX_MOUNT_RW : [CagefsMpItem._PREFIX_MOUNT_RO] }
null_options = []
return self.prefix() in prefix_compatibility_map.get(adopted.prefix(), null_options)
def is_dummy(self):
return self._path_spec is None
@staticmethod
def _adopt(x):
if isinstance(x, CagefsMpItem):
return x
else:
return CagefsMpItem(x)
@staticmethod
def _cut_off_mode(path_spec):
"""Cut off mode from path spec like @/var/run/screen,777
Only one comma per path spec is allowed ;-)"""
return path_spec.split(',')[0]
@staticmethod
def _cut_off_prefix(path_spec):
return path_spec.lstrip(CagefsMpItem.PREFIX_LIST)
def path(self):
return CagefsMpItem._cut_off_prefix(CagefsMpItem._cut_off_mode(self._path_spec))
def prefix(self):
if self._path_spec != self.path():
return self._path_spec[0]
else:
return ''
def is_cagefs_present():
return os.path.exists(CAGEFSCTL_TOOL)
def _mk_mount_dir_setup_perm(path, mode=0o755, owner_id=None, group_id=None):
# -1 means 'unchanged'
if group_id is None:
group_id = -1
if owner_id is None:
owner_id = -1
if not os.path.isdir(path):
os.mkdir(path)
if mode is not None:
os.chmod(path, mode)
os.chown(path, owner_id, group_id)
def _remount_cagefs(user=None, remount_in_background=False):
if not is_cagefs_present():
return
if user is None:
command = [CAGEFSCTL_TOOL, "--wait-lock", "--remount-all"]
else:
command = [CAGEFSCTL_TOOL, "--remount", user]
if remount_in_background:
subprocess.Popen( # pylint: disable=consider-using-with
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
else:
subprocess.run(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
def setup_mount_dir_cagefs(path, added_by=None, mode=0o755, owner_id=None, group_id=None,
prefix='', remount_cagefs=False, remount_in_background=False):
"""
Add mount point to /etc/cagefs/cagefs.mp
:param path: Directory path to be added in cagefs.mp and mounted
from within setup_mount_dir_cagefs().
If this directory does not exist, then it is created.
:param added_by: package or component, mount dir relates to, or whatever will
stay in cagefs.mp with "# added by..." comment
:param mode: If is not None: Regardless of whether directory exists or not prior this call,
it's permissions will be set to mode.
:param owner_id: Regardless of whether directory exists or not prior this call,
it's owner id will be set to.
If None, the owner won't be changed.
:param group_id: Regardless of whether directory exists or not prior this call,
it's group id will be set to.
If None, the group won't be changed.
:param prefix: Mount point prefix. Default is mount as RW.
Pass '!' to add read-only mount point.
Refer CageFS section at http://docs.cloudlinux.com/ for more options.
:param remount_cagefs: If True, cagefs skeleton will be automatically
remounted to apply changes.
:param remount_in_background: If True, cagefs remount will be done in separate
background process, without waiting for completion
:returns: None
Propagates native EnvironmentError if no CageFS installed or something else goes wrong.
Raises CagefsMpConflict if path is already specified in cagefs.mp, but in a way which is opposite
to mount_as_readonly param.
"""
_mk_mount_dir_setup_perm(path, mode, owner_id, group_id)
if not is_cagefs_present():
return
# Create cagefs.mp if absent. It will be merged when cagefsctl --init.
if not os.path.exists(CAGEFS_MP_FILENAME):
subprocess.run(
[CAGEFSCTL_TOOL, "--create-mp"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
add_new_line_to_cagefs_mp()
# ^^
# Hereafter we will not care if there was
# 'no newline at the end of file'
with open(CAGEFS_MP_FILENAME, "r+", encoding="utf-8") as f:
new_item = CagefsMpItem(prefix + path).mode(mode)
trim_nl_iter = (file_line.rstrip() for file_line in f)
pre_exist_option = [x for x in trim_nl_iter if new_item.pre_exist_in(x)]
if not pre_exist_option:
f.seek(0, 2) # 2: seek to the end of file
if added_by is not None:
# no newline is allowed
added_by = added_by.strip()
print("# next line is added by ", added_by, file=f)
print(new_item, file=f)
if remount_cagefs:
_remount_cagefs(remount_in_background=remount_in_background)
elif not new_item.is_compatible_by_prefix_with(pre_exist_option[-1]):
raise CagefsMpConflict(new_item, pre_exist_option[-1])
def _get_cagefs_mp_lines():
with open(CAGEFS_MP_FILENAME, "r", encoding="utf-8") as f:
return f.readlines()
def _write_cagefs_mp_lines(lines):
with open(CAGEFS_MP_FILENAME, "w", encoding="utf-8") as f:
return f.writelines(lines)
# next line is added by
def add_new_line_to_cagefs_mp():
"""
Add new line to the end of /etc/cagefs/cagefs.mp file if it is not there
"""
lines = _get_cagefs_mp_lines()
# file is not empty and last line in the file does not end with new line symbol ?
if lines and lines[0] != '' and lines[-1][-1] != '\n':
lines[-1] += '\n'
_write_cagefs_mp_lines(lines)
def remove_mount_dir_cagefs(path, remount_cagefs=False, remount_in_background=False):
"""
Remove mount points matching given path from cagefs.mp file
:param str path: Path that should be removed from file.
:param bool remount_cagefs: Remount cagefs skeleton or not
:param remount_in_background: If True, cagefs remount will be done in separate
background process, without waiting for completion
:return: Nothing
"""
lines = _get_cagefs_mp_lines()
r = re.compile(rf'^[{CagefsMpItem.PREFIX_LIST}]?{re.escape(path)}(,\d+)?$')
lines_with_excluded_path = [line for line in lines if not r.match(line)]
# nothing to do if nothing found by pattern
if len(lines) == len(lines_with_excluded_path):
return
_write_cagefs_mp_lines(lines_with_excluded_path)
if remount_cagefs:
_remount_cagefs(remount_in_background=remount_in_background)
def in_cagefs():
"""If this folder /var/.cagefs exists, it means process is inside cagefs"""
return os.path.isdir('/var/.cagefs')
def _is_cagefs_enabled(user):
"""
Check that cagefs enabled for user
"""
try:
cagefs_lib_dir = '/usr/share/cagefs/'
if cagefs_lib_dir not in sys.path:
sys.path.append(cagefs_lib_dir)
import cagefsctl # pylint: disable=import-outside-toplevel
except ImportError:
return False
try:
if not cagefsctl.is_user_enabled(user):
return False
except AttributeError as e:
raise CagefsNotSupportedError(
'ERROR: CageFS version is unsupported. Please update CageFS.'
) from e
return True