"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import json
import logging
import pwd
import shutil
import time
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import psutil
from defence360agent.contracts.messages import Message, MessageType
from defence360agent.internals.global_scope import g
from defence360agent.internals.the_sink import TheSink
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils import rmtree
from defence360agent.utils.threads import to_thread
from imav.malwarelib.config import (
ExitDetachedScanType,
MalwareScanResourceType,
)
from imav.malwarelib.scan import (
ScanAlreadyCompleteError,
ScanInfoError,
)
from imav.malwarelib.scan.detached import (
DetachedDir,
DetachedOperation,
DetachedScan,
DetachedState,
)
from imav.malwarelib.scan.mds import MDS, MDS_PATH
from imav.malwarelib.scan.mds.report import (
MalwareDatabaseHitInfo,
scan_report,
)
from imav.malwarelib.scan.utils import trim_file_content
MDS_PID_WAIT_TIME = 30
logger = logging.getLogger(__name__)
class DetachedOperationFailed(Exception):
pass
class MDSDetachedScanDir(DetachedDir):
DETACHED_DIR = "/var/imunify360/dbscan/run/scan"
ignore_file: Path
report_file: Path
scan_info_file: Path
FILES = {
**DetachedDir.FILES,
"ignore_file": "ignore",
"report_file": "report_file",
"scan_info_file": "scan_info.json",
}
class MDSDetachedCleanupDir(DetachedDir):
DETACHED_DIR = "/var/imunify360/dbscan/run/clean"
report_file: Path
FILES = {
**DetachedDir.FILES,
"report_file": "report_file",
}
class MDSDetachedRestoreDir(DetachedDir):
DETACHED_DIR = "/var/imunify360/dbscan/run/restore"
report_file: Path
FILES = {
**DetachedDir.FILES,
"report_file": "report_file",
}
@dataclass
class DbScanInfo:
cmd: List[str]
scan_type: str = None
initial_path: str = None
started: int = 0
class MDSDetachedOperation(DetachedOperation, ABC):
def get_detached_process_state(self, start_time=0) -> str:
return super().get_detached_process_state(start_time=start_time)
class MDSDetachedScan(MDSDetachedOperation, DetachedScan):
RESOURCE_TYPE = MalwareScanResourceType.DB
DETACHED_DIR_CLS = MDSDetachedScanDir
def _is_scan_finished(self):
return self.detached_dir.done_file.exists()
def _get_progress_info(self):
try:
with self.detached_dir.progress_file.open() as fp:
return json.load(fp)
except (FileNotFoundError, json.JSONDecodeError):
return {}
@property
def progress(self):
progress_info = self._get_progress_info()
try:
return int(float(progress_info["progress_main"]))
except KeyError:
return 100 if self._is_scan_finished() else 0
@property
def phase(self):
if self.progress == 0:
return "avd scanning"
return "{} scanning".format(MDS)
@property
def total_resources(self):
try:
return int(self._get_progress_info()["total_db_count"])
except KeyError:
return 0
def _load_scan_info(self) -> Optional[DbScanInfo]:
try:
with self.detached_dir.scan_info_file.open() as fp:
info = json.load(fp)
started = self.detached_dir.path.stat().st_mtime
except (FileNotFoundError, json.JSONDecodeError):
return None
return DbScanInfo(
cmd=info["cmd"],
scan_type=info["scan_type"],
initial_path=info["initial_path"],
started=started,
)
def _load_single_report(self, report_file):
with report_file.open() as f:
return json.load(f)
@classmethod
def process_is_suitable(cls, proc) -> bool:
if proc:
cmdline = proc.cmdline()
return MDS_PATH in cmdline and "--scan" in cmdline
return False
async def complete(self) -> MessageType.MalwareDatabaseScan:
if (scan_info := self._load_scan_info()) is None:
raise ScanInfoError
message = MessageType.MalwareDatabaseScan(
args=scan_info.cmd,
path=scan_info.initial_path,
scan_id=self.detached_id,
type=scan_info.scan_type,
)
try:
hit_report_list = [
self._load_single_report(report_file)
for report_file in self.detached_dir.path.glob("report*.json")
]
except FileNotFoundError as e:
raise ScanAlreadyCompleteError from e
except Exception as e:
logger.exception("Unable to parse MDS report")
message.update_with_error(str(e))
return message
if not hit_report_list:
message.update_with_error("No reports found")
return message
for report in hit_report_list:
report_msg = MessageType.MDSReport(
report, scan_id=self.detached_id
)
await g.sink.process_message(report_msg)
result_report = await scan_report(hit_report_list, self.detached_id)
message.update_with_report(result_report)
return message
async def kill_running_scan_process(self, timer=time.monotonic):
error = None
deadline = timer() + MDS_PID_WAIT_TIME
while timer() < deadline:
try:
pid = self.get_pid()
break
except (FileNotFoundError, ValueError) as err:
await asyncio.sleep(1)
error = err
else:
logger.warning(
"Cannot find the mds process to kill (%s): %r."
" Assuming it's already dead.",
self.detached_id,
error,
)
return
try:
proc = psutil.Process(pid)
if self.process_is_suitable(proc):
proc.kill()
except psutil.Error as err:
logger.warning(
"Problem when killing the running mds process: %s", err
)
async def handle_aborted_process(
self,
*,
sink,
exit_type: str = ExitDetachedScanType.ABORTED,
kill: bool = True,
scan_path: Optional[str] = None,
scan_type: Optional[str] = None,
scan_started: Optional[float] = None,
cmd: Optional[List[str]] = None,
out: str = "",
err: str = "",
) -> None:
"""Removes aborted detached scan from scan_queue and writes it to DB.
- Parses data about scan from scan_queue and writes it to DB
- Kills scan process, if it exists
- Deletes scan_dir
- Processes MalwareScan and ScanFailed(in case of 'ABORTED') messages
:param sink: the sink to send messages
:param exit_type: 'ABORTED' by default,
if stopped by user, then 'STOPPED'
:param kill: try to kill a process
:param scan_path: which path was scanned
:param scan_type: what is the scan's type
:param scan_started: when was the scan started (if known)
:param cmd: command line arguments
:param out: command stdout
:param err: command stderr
"""
if kill:
await self.kill_running_scan_process()
scan_info = self._load_scan_info() or DbScanInfo(cmd=[])
cmd = cmd or scan_info.cmd
scan_path = scan_path or scan_info.initial_path
scan_type = scan_type or scan_info.scan_type
scan_started = int(scan_started or scan_info.started)
await sink.process_message(
MessageType.MalwareDatabaseScan(
args=cmd,
error=exit_type,
path=scan_path,
scan_id=self.detached_id,
type=scan_type,
started=scan_started,
completed=int(time.time()),
)
)
scan_dir = self.detached_dir
if exit_type == ExitDetachedScanType.ABORTED:
stdout = trim_file_content(scan_dir.log_file) or out
stderr = trim_file_content(scan_dir.err_file) or err
msg = MessageType.ScanFailed()
msg["out"] = stdout
msg["err"] = stderr
logger.warning(
"Scan %s was aborted: %s, %s", self.detached_id, stdout, stderr
)
msg["command"] = cmd
msg["message"] = "aborted"
msg["scan_id"] = self.detached_id
msg["path"] = scan_path
await sink.process_message(msg)
if not scan_dir.path.is_dir():
logger.warning("No such directory: %s", scan_dir)
else:
rmtree(str(scan_dir))
class MDSDetachedMutableOperation(MDSDetachedOperation, ABC):
"""Parsing of operations that can succeed or fail for any DB hit"""
SUCCESS_MSG = None
FAIL_MSG = None
async def complete(self) -> Message:
message = {}
try:
hit_report_list = [
self._load_single_report(report)
for report in self.detached_dir.path.glob("report*.json")
]
message = await self._parse_report(hit_report_list)
except FileNotFoundError as e:
raise ScanAlreadyCompleteError from e
except DetachedOperationFailed as e:
message["error"] = str(e)
logger.exception(f"Unable to parse MDS {self.NAME} report")
return self.FAIL_MSG(error=str(e))
else:
if not hit_report_list:
return self.FAIL_MSG(error=f"No {self.NAME} reports found")
for report in hit_report_list:
report_msg = MessageType.MDSReport(
report, scan_id=self.detached_id
)
await g.sink.process_message(report_msg)
return self.SUCCESS_MSG(**message)
@staticmethod
def _load_single_report(report_file):
with report_file.open() as f:
return json.load(f)
async def _parse_report(self, hit_report_list) -> dict:
users_from_panel = set(await hosting_panel.HostingPanel().get_users())
pw_all = await to_thread(pwd.getpwall)
succeeded = set()
failed = set()
for report in hit_report_list:
if errors := report["error_list"]:
logger.error(f"Errors in MDS {self.NAME}: %s", errors)
raise DetachedOperationFailed(errors)
if not report["rows_with_error"]:
succeeded.add(
MalwareDatabaseHitInfo.from_report(
report, users_from_panel, pw_all, self.detached_id
)
)
else:
failed.add(
MalwareDatabaseHitInfo.from_report(
report, users_from_panel, pw_all, self.detached_id
)
)
return {"succeeded": succeeded, "failed": failed}
class MDSDetachedCleanup(MDSDetachedMutableOperation):
NAME = "cleanup"
DETACHED_DIR_CLS = MDSDetachedCleanupDir
SUCCESS_MSG = MessageType.MalwareDatabaseCleanup
FAIL_MSG = MessageType.MalwareDatabaseCleanupFailed
@property
def on_complete_message(self) -> MessageType.MalwareCleanComplete:
return MessageType.MalwareCleanComplete(
scan_id=self.detached_id,
)
async def handle_aborted_process(
self,
*,
sink: TheSink,
exit_type: str = ExitDetachedScanType.ABORTED,
scan_path: Optional[str] = None,
) -> None:
assert (
exit_type == ExitDetachedScanType.ABORTED
), "Cleanup cannot be stopped, only aborted status is supported"
# NOTE: No need to kill running process because Imunify360 does
# it through systemd and we don't support MDS for Imunify AV.
logger.info("Cleanup %s was %s", self.detached_id, exit_type)
stdout = trim_file_content(self.detached_dir.log_file)
stderr = trim_file_content(self.detached_dir.err_file)
msg = MessageType.MalwareDatabaseCleanupFailed(
error=(
f"path: {scan_path}, "
f"detached_id: {self.detached_id}, "
f"out: {stdout}, "
f"err: {stderr}"
)
)
await sink.process_message(msg)
shutil.rmtree(str(self.detached_dir.path), ignore_errors=True)
@classmethod
def process_is_suitable(cls, proc) -> bool:
if proc:
cmdline = proc.cmdline()
return MDS_PATH in cmdline and "--clean" in cmdline
return False
class MDSDetachedRestore(MDSDetachedMutableOperation):
NAME = "restore"
DETACHED_DIR_CLS = MDSDetachedRestoreDir
SUCCESS_MSG = MessageType.MalwareDatabaseRestore
FAIL_MSG = MessageType.MalwareDatabaseRestoreFailed
@classmethod
def process_is_suitable(cls, proc) -> bool:
if proc:
cmdline = proc.cmdline()
return MDS_PATH in cmdline and "--restore" in cmdline
return False
@property
def on_complete_message(self) -> MessageType.MalwareRestoreComplete:
return MessageType.MalwareRestoreComplete(
scan_id=self.detached_id,
)
async def handle_aborted_process(
self,
*,
sink: TheSink,
exit_type: str = ExitDetachedScanType.ABORTED,
scan_path: Optional[str] = None,
) -> None:
assert (
exit_type == ExitDetachedScanType.ABORTED
), "Restore cannot be stopped, only aborted status is supported"
logger.info("Restore %s was %s", self.detached_id, exit_type)
stdout = trim_file_content(self.detached_dir.log_file)
stderr = trim_file_content(self.detached_dir.err_file)
msg = MessageType.MalwareDatabaseRestoreFailed(
error=(
f"path: {scan_path}, "
f"detached_id: {self.detached_id}, "
f"out: {stdout}, "
f"err: {stderr}"
),
)
await sink.process_message(msg)
shutil.rmtree(str(self.detached_dir.path), ignore_errors=True)