import logging
import os
import shutil
import json
from defence360agent.contracts.config import Core
from defence360agent.contracts.license import LicenseCLN
from defence360agent.utils import run_cmd_and_log, OsReleaseInfo, os_version
from defence360agent.subsys.features.abstract_feature import (
AbstractFeature,
FeatureError,
FeatureStatus,
)
from defence360agent import utils
from defence360agent.rpc_tools import exceptions
logger = logging.getLogger(__name__)
class KernelCare(AbstractFeature):
KC_PROPERTIES = "/var/imunify360/plesk-previous-kernelcare-stats.json"
KC_SCRIPT_URL = (
"https://repo.cloudlinux.com/kernelcare/kernelcare_install.sh"
)
LOG_DIR = "/var/log/%s" % Core.PRODUCT
NAME = "KernelCare"
BIN_PATH = "/usr/bin/kcarectl"
INSTALL_LOG_FILE_MASK = "%s/install-kernelcare.log.*" % LOG_DIR
REMOVE_LOG_FILE_MASK = "%s/remove-kernelcare.log.*" % LOG_DIR
INSTALL_CMD = "curl -s %s | bash" % KC_SCRIPT_URL
REMOVE_CMD_REDHAT = "yum remove -y kernelcare"
REMOVE_CMD_DEBIAN = "apt-get -y remove kernelcare"
_CMD_LIST = [INSTALL_CMD, REMOVE_CMD_REDHAT, REMOVE_CMD_DEBIAN]
STATUS_MESSAGE = {
0: "Host is updated to the latest patch level",
1: "There are no applied patches",
2: "There are new not applied patches",
3: "Kernel is unsupported",
}
async def _check_installed_impl(self) -> bool:
return os.path.exists(self.BIN_PATH)
async def status(self):
"""
:raises FeatureError: if kernelcare returns unexpected error
:return: str: feature's current status
"""
status = await super().status()
is_feature_installed = (
status["items"]["status"] == FeatureStatus.INSTALLED
)
if not is_feature_installed:
return status
ret, out, err = await self.get_output_kcarectl("--status")
try:
# EDF is obsolete since 6.1
status["items"]["edf_supported"] = False
status["items"]["message"] = self.STATUS_MESSAGE[ret]
except KeyError:
raise FeatureError(
"Unknown error occured while getting status from kcarectl. "
f"stdout: [{out}], stderr: [{err}], return code: [{ret}]"
)
return status
@AbstractFeature.raise_if_shouldnt_install_now
async def install(self):
return await run_cmd_and_log(
self.INSTALL_CMD,
self.INSTALL_LOG_FILE_MASK,
env=dict(os.environ, DEBIAN_FRONTEND="noninteractive"),
)
@AbstractFeature.raise_if_shouldnt_remove_now
async def remove(self):
if OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN:
command = self.REMOVE_CMD_DEBIAN
else:
command = self.REMOVE_CMD_REDHAT
return await run_cmd_and_log(command, self.REMOVE_LOG_FILE_MASK)
async def get_plugin_info(self):
try:
output = await self.run_kcarectl("--plugin-info", "--json")
except FileNotFoundError:
raise exceptions.RpcError("kcarectl not found")
except utils.CheckRunError as e:
if not (e.returncode == 2 and b"--json" in e.stderr):
raise # reraise as is
else: # unrecognized arguments: --json
# use RpcError, to get an error
raise exceptions.RpcError(
"Your kcarectl version doesn't support --json option."
" Please, update to kernelcare-2.15-2 or newer."
)
try:
results = json.loads(output.decode().partition("--START--")[-1])
except ValueError:
raise exceptions.RpcError(
"Can't decode kcarectl output as json."
" Try updating to the latest kernelcare version."
)
return results
async def run_kcarectl(self, *options):
return await utils.check_run((self.BIN_PATH,) + options)
async def get_output_kcarectl(self, *options):
ret, out, err = await utils.run([self.BIN_PATH, *options])
return ret, out.decode(), err.decode()