"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import json
import logging
import os
import re
from contextlib import suppress
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSource
from imav.malwarelib.config import MalwareScanType
from imav.malwarelib.scan.ai_bolit.report import parse_report_json
from imav.malwarelib.scan.scan_result import ScanResult
from imav.malwarelib.subsys.ainotify import Inotify, Watcher
from defence360agent.utils import create_task_and_log_exceptions, Scope
logger = logging.getLogger(__name__)
class AibolitResultsScan(MessageSource):
"""
Plugin to handle generated ai-bolit scan reports.
Checks the contents of the *RESULT_SCAN_DIR* for the presence ai-bolit
report files that match the *REPORT_FILE_MASK* pattern
processes and deletes them.
"""
SCOPE = Scope.IM360
RESULT_SCAN_DIR = "/var/imunify360/aibolit/resident/out"
REPORT_FILE_MASK = re.compile(r"^(?P<uuid>[0-9a-f-]{36})\.report$")
def __init__(self):
self._watcher = None
self._init_task = None
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
self._init_task = create_task_and_log_exceptions(
self._loop, self._init_handling_and_setup_watcher
)
async def shutdown(self):
if self._init_task is not None:
self._init_task.cancel()
await self._init_task
self._shutdown_watcher()
def _setup_watcher(self):
# target directory must exist
os.makedirs(self.RESULT_SCAN_DIR, mode=0o700, exist_ok=True)
self._watcher = Watcher(
self._loop, coro_callback=self._handle_incoming_report
)
# path in bytes
self._watcher.watch(
path=self.RESULT_SCAN_DIR.encode(), mask=Inotify.MOVED_TO
)
def _shutdown_watcher(self):
if self._watcher is not None:
self._watcher.close()
def _get_scan_result_from_report(self, report: dict) -> ScanResult:
report_summary = report["summary"]
path = [hit["file_name"] for hit in parse_report_json(report)]
scan_id = report_summary.get("scan_id")
scan_result = ScanResult(
path, scan_id=scan_id, scan_type=MalwareScanType.REALTIME
)
scan_result.total_files = report_summary.get("total_files")
scan_result.errors = report_summary.get("errors", [])
end_time = report_summary.get("report_time")
scan_time = report_summary.get("scan_time")
start_time = end_time - scan_time
scan_result.set_start_stop(start_time, end_time)
scan_result.scans = [parse_report_json(report)]
return scan_result
async def _handle_report(self, report: dict):
try:
scan_result = self._get_scan_result_from_report(report)
result = await scan_result.get()
await self._sink.process_message(
MessageType.MalwareScan(**result.to_dict())
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.exception(
"Error '{!r}' occurred while processing "
"ai-bolit report: {}".format(exc, report)
)
async def handle_report_file(self, path: str):
match_file = self.REPORT_FILE_MASK.match(os.path.basename(path))
if match_file and os.path.isfile(path):
try:
with open(path) as f:
report = json.load(f)
except json.JSONDecodeError as exc:
logger.warning(
"Problem with parsing %s aibolit report: %s", path, exc
)
else:
report["summary"].setdefault(
"scan_id", match_file.group("uuid")
)
await self._handle_report(report)
with suppress(FileNotFoundError):
os.unlink(path)
async def _handle_incoming_report(self, event):
logger.info("Inotify event: %s", event)
report_file = os.path.join(
os.fsdecode(event.path), os.fsdecode(event.name)
)
await self.handle_report_file(report_file)
async def _handle_existing_reports(self):
if os.path.exists(self.RESULT_SCAN_DIR):
with os.scandir(self.RESULT_SCAN_DIR) as it:
for entry in it:
await self.handle_report_file(entry.path)
async def _init_handling_and_setup_watcher(self):
await self._handle_existing_reports()
self._setup_watcher()