"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import json
import logging
import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import psutil
from imav.malwarelib.config import (
ExitDetachedScanType,
MalwareScanResourceType,
)
from imav.malwarelib.scan import ScanAlreadyCompleteError
from imav.malwarelib.scan.ai_bolit import AIBOLIT, AIBOLIT_PATH
from imav.malwarelib.scan.ai_bolit.report import (
parse_report_csv,
parse_report_json,
)
from imav.malwarelib.scan.detached import DetachedDir, DetachedScan
from imav.malwarelib.scan.utils import trim_file_content
from defence360agent.contracts.messages import MessageType
from defence360agent.utils import rmtree
logger = logging.getLogger(__name__)
AIBOLIT_PID_WAIT_TIME = 30
class AiBolitDetachedDir(DetachedDir):
DETACHED_DIR = "/var/imunify360/aibolit/run"
csv_report_path: Path
json_report_path: Path
listing_file: Path
scan_info_file: Path
FILES = {
**DetachedDir.FILES,
"csv_report_path": "report.csv",
"json_report_path": "report.json",
"listing_file": "file",
"scan_info_file": "scan_info.json",
}
def __init__(self, detached_id, tmp_listing_file=None):
"""
NOTE: Initialization should not create any files
"""
super().__init__(detached_id)
self.tmp_listing_file = tmp_listing_file
def __enter__(self):
super().__enter__()
# Needed in case of internal finder
if self.tmp_listing_file is not None:
shutil.copyfile(self.tmp_listing_file.name, str(self.listing_file))
return self
@dataclass
class FileScanInfo:
cmd: List[str]
scan_type: Optional[str]
class AiBolitDetachedScan(DetachedScan):
RESOURCE_TYPE = MalwareScanResourceType.FILE
DETACHED_DIR_CLS = AiBolitDetachedDir
def _parse_report(self):
"""Parse json/csv ai-bolit report
Parse whichever one exists. We can't use the use_json flag to
decide which one should exist, because the flag could be changed
after the report has been created.
"""
if self.detached_dir.csv_report_path.is_file():
return parse_report_csv(self.detached_dir.csv_report_path)
with self.detached_dir.json_report_path.open() as fp:
report = json.load(fp)
return parse_report_json(report)
def get_reported_summary(self):
"""Get scan performance metrics if present in the summary"""
with self.detached_dir.json_report_path.open() as f:
summary = json.load(f).get("summary", {})
stats = {
k: v
for k, v in summary.items()
if k
in (
"scan_time",
"report_time",
"finder_time",
"cas_time",
"deobfuscate_time",
"scan_time_hs",
"scan_time_preg",
"smart_time_hs",
"smart_time_preg",
"mem_peak",
"total_files",
"cpu_user",
"cpu_system",
"rchar",
"wchar",
"syscr",
"syscw",
"read_bytes",
"write_bytes",
"cancelled_write_bytes",
"decision_stats",
"errors",
)
}
if "ai_version" in summary:
stats["aibolit_version"] = summary["ai_version"]
if "db_version" in summary:
stats["signatures_version"] = summary["db_version"]
return stats
def _get_progress_info(self):
try:
with self.detached_dir.progress_file.open() as fp:
return json.load(fp)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _is_scan_finished(self):
return self.detached_dir.done_file.exists()
@property
def progress(self):
try:
return int(float(self._get_progress_info()["progress"]))
except KeyError:
return 100 if self._is_scan_finished() else 0
@property
def total_resources(self):
try:
return int(self._get_progress_info()["files_total"])
except (KeyError, ValueError):
return 0
@property
def phase(self):
if self.progress == 0:
return "preparing file list"
return "{} scanning".format(AIBOLIT)
def _load_scan_info(self) -> FileScanInfo:
try:
with self.detached_dir.scan_info_file.open() as fp:
info = json.load(fp)
except (FileNotFoundError, json.JSONDecodeError):
info = {}
return FileScanInfo(
cmd=info.get("cmd", []), scan_type=info.get("scan_type")
)
async def complete(self) -> MessageType.MalwareScan:
# TODO: test this method
scan_info = self._load_scan_info()
message = {
"summary": {
"args": scan_info.cmd,
"scanid": self.detached_id,
"type": scan_info.scan_type,
"stderr": self.extract_stderr(),
"stdout": self.extract_stdout(),
}
}
try:
reported_summary = self.get_reported_summary()
except FileNotFoundError as e:
raise ScanAlreadyCompleteError from e
try:
scan_data = self._parse_report()
except FileNotFoundError as e:
raise ScanAlreadyCompleteError from e
except Exception as e:
message["summary"]["error"] = str(e)
message["results"] = []
logger.exception("Unable to parse AI-BOLIT report")
else:
message["summary"].update(reported_summary)
message["results"] = list(scan_data)
return MessageType.MalwareScan(message)
def extract_stdout(self) -> str:
try:
return self.detached_dir.log_file.read_text()
except FileNotFoundError:
return ""
def extract_stderr(self) -> str:
try:
return self.detached_dir.err_file.read_text()
except FileNotFoundError:
return ""
@classmethod
def process_is_suitable(cls, proc) -> bool:
if proc:
return AIBOLIT_PATH in proc.cmdline()
return False
async def kill_running_scan_process(self, timer=time.monotonic):
error = None
deadline = timer() + AIBOLIT_PID_WAIT_TIME
while timer() < deadline:
try:
pid = self.get_pid()
break
except (FileNotFoundError, ValueError) as err:
await asyncio.sleep(1)
error = err
else:
logger.warning(
"Cannot find the aibolit process to kill (%s): %r."
" Assuming it's already dead.",
self.detached_id,
error,
)
return
try:
proc = psutil.Process(pid)
if self.process_is_suitable(proc):
proc.kill()
except psutil.Error as err:
logger.warning(
"Problem when killing the running aibolit process: %s", err
)
async def handle_aborted_process(
self,
*,
sink,
exit_type: str = ExitDetachedScanType.ABORTED,
kill: bool = True,
scan_path: Optional[str] = None,
scan_type: Optional[str] = None,
scan_started: Optional[float] = None,
) -> None:
if kill:
await self.kill_running_scan_process()
scan_dir = self.detached_dir
stdout = trim_file_content(scan_dir.log_file)
stderr = trim_file_content(scan_dir.err_file)
cmd = self._load_scan_info().cmd
if scan_path is None and cmd:
index = cmd.index("--path")
scan_path = cmd[index + 1]
logger.info("Scan %s was aborted", self.detached_id)
scan_info = self._load_scan_info()
scan_result = {
"summary": {
"scanid": self.detached_id,
"total_files": 0,
"total_malicious": 0,
"completed": time.time(),
"error": exit_type,
"started": scan_started or 0.0,
"type": scan_type or scan_info.scan_type,
"path": scan_path,
"stdout": stdout,
"stderr": stderr,
"args": cmd or scan_info.cmd,
},
"results": {},
}
await sink.process_message(MessageType.MalwareScan(**scan_result))
if exit_type == ExitDetachedScanType.ABORTED:
msg = MessageType.ScanFailed()
msg["out"] = stdout
msg["err"] = stderr
logger.warning(
"Scan was aborted: %s", msg["err"] + ", " + msg["out"]
)
msg["command"] = cmd
msg["message"] = "aborted"
msg["scan_id"] = self.detached_id
msg["path"] = scan_path
await sink.process_message(msg)
if not scan_dir.path.is_dir():
logger.warning("No such directory: %s", scan_dir)
else:
rmtree(str(scan_dir))