import asyncio
import logging
import os
import subprocess as su
from typing import Iterable, Union
from defence360agent.contracts.config import Core
from defence360agent.utils import check_run, run, OsReleaseInfo
logger = logging.getLogger(__name__)
DOS_PROTECTOR_SERVICE_NAME = "imunify360-dos-protection"
UAL_SERVICE_NAME = "imunify360-unified-access-logger"
PAM_SERVICE_NAME = "imunify360-pam"
AUDITD_SERVICE_NAME = "imunify-auditd-log-reader"
SCANLOGD_SERVICE_NAME = "imunify360-scanlogd"
def _apply_cmd(func):
async def wrapper(*args, **kwargs):
cmd = func(*args, **kwargs)
logger.debug("check_call(%r)", cmd)
await check_run(cmd)
return wrapper
async def _reset_failed_state(
services: Iterable[Union["_CentOs6", "_SystemctlBased"]]
):
for s in services:
await s.reset_failed()
await s.restart()
for _ in range(10):
if await s.is_active():
break
logger.warning(
"Service %s is still not active, sleep for %s seconds", s, 1
)
await asyncio.sleep(1)
class _CentOs6:
SVC_CTL_BIN = "/sbin/service"
_CHKCONFIG = "/sbin/chkconfig"
def __init__(self, service_name):
self._service_name = service_name
@_apply_cmd
def start(self):
return [self.SVC_CTL_BIN, self._service_name, "start"]
@_apply_cmd
def stop(self):
return [self.SVC_CTL_BIN, self._service_name, "stop"]
@_apply_cmd
def restart(self):
return [self.SVC_CTL_BIN, self._service_name, "restart"]
async def reset_failed(self):
"""Not implemented for Centos6"""
pass
@_apply_cmd
def enable(self, **kwargs):
return [self._CHKCONFIG, "--add", self._service_name]
async def is_enabled(self):
cmd = [self._CHKCONFIG, "--list", self._service_name]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=su.PIPE, stderr=su.DEVNULL
)
out, _ = await proc.communicate()
rc = await proc.wait()
return rc == 0 and b":on" in out
def is_enabled_sync(self):
cmd = [self._CHKCONFIG, "--list", self._service_name]
cp = su.run(cmd, stdout=su.PIPE, stderr=su.DEVNULL)
return cp.returncode == 0 and b":on" in cp.stdout
@_apply_cmd
def disable(self):
return [self._CHKCONFIG, "--del", self._service_name]
mask = disable
unmask = enable
async def is_active(self):
cmd = [self.SVC_CTL_BIN, self._service_name, "status"]
exit_code, _, _ = await run(cmd)
return exit_code == 0
async def activate_socket_service(self):
if await self.is_enabled() and not await self.is_active():
await _reset_failed_state((self,))
def unit_exists(self):
cp = su.run(
[self._CHKCONFIG, "--list", self._service_name],
stdout=su.DEVNULL,
stderr=su.DEVNULL,
)
return cp.returncode == 0
class _SystemctlBased:
SVC_CTL_BIN = "systemctl"
def __init__(self, service_name):
self._service_name = service_name
@_apply_cmd
def start(self):
return [self.SVC_CTL_BIN, "start", self._service_name]
@_apply_cmd
def stop(self):
return [self.SVC_CTL_BIN, "stop", self._service_name]
@_apply_cmd
def restart(self):
return [self.SVC_CTL_BIN, "restart", self._service_name]
@_apply_cmd
def _enable_now(self, *, now: bool):
return [
self.SVC_CTL_BIN,
"enable",
*(["--now"] if now else []),
self._service_name,
]
async def enable(self, *, now: bool):
await self._enable_now(now=now)
# WARN: Ubuntu 16.04 demonstrates very special behavior of the
# `systemcl enable --now` command - if the unit is stopped it
# wouldn't be started. We need to handle that case.
# TODO: Remove this case on dropping support for Ubuntu 16.04.
osinfo = {}
try:
OsReleaseInfo.dict_from_file(osinfo)
except (FileNotFoundError, PermissionError):
return
if osinfo.get("ID", "").lower() != "ubuntu":
return
if osinfo.get("VERSION_ID", "") == "16.04":
await self.restart()
async def is_enabled(self):
cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=su.DEVNULL, stderr=su.DEVNULL
)
await proc.communicate()
rc = await proc.wait()
return rc == 0
def is_enabled_sync(self):
cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
rc = su.call(cmd, stdout=su.DEVNULL, stderr=su.DEVNULL)
return rc == 0
@_apply_cmd
def disable(self, *, now: bool):
return [
self.SVC_CTL_BIN,
"disable",
*(["--now"] if now else []),
self._service_name,
]
@_apply_cmd
def reload(self):
return [self.SVC_CTL_BIN, "reload", self._service_name]
@_apply_cmd
def mask(self):
"""
It was created for imunify360-webshield which required masking as far
This is no more relevant but let it stay
is started by 'Wants=' in imunify360.service
"""
return [self.SVC_CTL_BIN, "mask", self._service_name]
@_apply_cmd
def unmask(self):
"""
It was created for imunify360-webshield which required masking as far
This is no more relevant but let it stay
is started by 'Wants=' in imunify360.service
"""
return [self.SVC_CTL_BIN, "unmask", self._service_name]
async def is_active(self):
cmd = [self.SVC_CTL_BIN, "is-active", self._service_name]
exit_code, _, _ = await run(cmd)
return exit_code == 0
@_apply_cmd
def reset_failed(self):
return [self.SVC_CTL_BIN, "reset-failed", self._service_name]
async def activate_socket_service(self):
agent_service_socket = adaptor(f"{self._service_name}.socket")
if (
await agent_service_socket.is_enabled()
and not await agent_service_socket.is_active()
):
await _reset_failed_state((self, agent_service_socket))
def unit_exists(self):
cp = su.run(
[self.SVC_CTL_BIN, "cat", self._service_name],
stdout=su.DEVNULL,
stderr=su.DEVNULL,
)
return cp.returncode == 0
class _CentOs7(_SystemctlBased):
SVC_CTL_BIN = "/usr/bin/systemctl"
class _DebianUbuntu(_SystemctlBased):
SVC_CTL_BIN = "/bin/systemctl"
class MinidaemonService(_CentOs6):
def __init__(self, service_name="minidaemon"):
self._service_name = service_name
@_apply_cmd
def restart(self, service=None):
cmd = [self.SVC_CTL_BIN, self._service_name, "restart"] # restart all
if service:
cmd += [service] # restart specific child process
return cmd
def adaptor(service_name, *, include_centos6=True):
for a in (
_DebianUbuntu,
_CentOs7,
*((_CentOs6,) if include_centos6 else tuple()),
):
if os.path.exists(a.SVC_CTL_BIN):
return a(service_name)
else:
raise RuntimeError("Cannot instantiate appropriate adaptor.")
def imunify360_service():
return adaptor(Core.SVC_NAME)
def imunify360_dos_protector_service():
try:
return adaptor(DOS_PROTECTOR_SERVICE_NAME, include_centos6=False)
except RuntimeError:
logger.info("DOS Protector service is not available on this system")
return None
def imunify360_ual_service():
return adaptor(UAL_SERVICE_NAME)
def imunify360_pam_service():
return adaptor(PAM_SERVICE_NAME)
def imunify360_scanlogd_service():
return adaptor(SCANLOGD_SERVICE_NAME)
def imunify360_auditd_service():
unit = adaptor(AUDITD_SERVICE_NAME)
if unit.unit_exists():
return adaptor(AUDITD_SERVICE_NAME)
logger.info("Auditd-log-reader service is not available on this system")
return None