"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import re
from asyncio import CancelledError, Queue
from contextlib import suppress
from logging import getLogger
from typing import List
from defence360agent.api import inactivity
from defence360agent.contracts.config import Malware as Config
from defence360agent.contracts.license import LicenseError
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from defence360agent.utils import recurring_check
from imav.malwarelib.utils import malware_response
logger = getLogger(__name__)
class MRSUploader(MessageSink, MessageSource):
ERR_MSG = "Failed to submit a file"
SUSP_PATTERN = re.compile(r"(?:suspicious\..+|[CS]MW-SUS-.+|SMW-HEUR-ELF)")
def __init__(self):
self._upload_queue = Queue()
async def create_source(self, loop, sink):
self._sink = sink
self._loop = loop
self._upload_task = loop.create_task(self.upload())
async def create_sink(self, loop):
pass
async def shutdown(self):
self._upload_task.cancel()
with suppress(CancelledError):
await self._upload_task
def _separate_hits_by_type(self, results) -> tuple:
malicious = []
suspicious = []
extended_suspicious = []
for file, data in results.items():
is_extended_suspicious = False
is_suspicious = False
is_malicious = False
for hit in data["hits"]:
is_extended_suspicious |= hit.get("extended_suspicious", False)
is_suspicious |= bool(
hit["suspicious"]
and self.SUSP_PATTERN.match(hit["matches"])
)
is_malicious |= not hit["suspicious"]
hit_info = malware_response.HitInfo(file, data["hash"])
if is_extended_suspicious:
extended_suspicious.append(hit_info)
elif is_suspicious:
suspicious.append(hit_info)
elif is_malicious:
malicious.append(hit_info)
return malicious, suspicious, extended_suspicious
@expect(MessageType.MalwareScan)
async def process_scan(self, message):
results = message["results"]
if results is None:
return
if not Config.SEND_FILES:
logger.info("Uploading files to MRS is disabled")
return
(
malicious_hits,
suspicious_hits,
extended_suspicious_hits,
) = self._separate_hits_by_type(results)
if malicious_hits:
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=malicious_hits, upload_reason="malicious"
)
)
if suspicious_hits:
# Move uploading to another task
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=suspicious_hits, upload_reason="suspicious"
)
)
if extended_suspicious_hits: # pragma: no cover
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=extended_suspicious_hits,
upload_reason="extended-suspicious",
)
)
errors = message["summary"].get("errors")
if errors:
error_hits = [
malware_response.HitInfo(hit["file"], hit["hash"])
for hit in errors
]
await self._sink.process_message(
MessageType.MalwareMRSUpload(
hits=error_hits, upload_reason="scan_error"
)
)
@recurring_check(0)
async def upload(self):
while True:
files, upload_reason, message = await self._upload_queue.get()
try:
await self._upload_files(files, upload_reason, message)
finally:
self._upload_queue.task_done()
async def _upload_files(self, files, upload_reason, message):
with inactivity.track.task("mrs_upload"):
for file in files:
try:
await malware_response.upload_with_retries(
file, upload_reason=upload_reason
)
except LicenseError as e:
logger.warning("Cannot process message %s: %s", message, e)
break
except FileNotFoundError as e:
err = "{}. {}".format(self.ERR_MSG, e.strerror)
logger.warning("%s: %s", err, e.filename)
@expect(MessageType.MalwareMRSUpload)
async def process_hits(self, message):
hits: List[malware_response.HitInfo] = message["hits"]
upload_reason = message.get("upload_reason", "suspicious")
unknown_hashes = await malware_response.check_known_hashes(
self._loop, (hit.hash for hit in hits), upload_reason
)
if not unknown_hashes:
logger.info("All files are known to MRS. Skipping uploading...")
return
files = (hit.file for hit in hits if hit.hash in unknown_hashes)
self._upload_queue.put_nowait((files, upload_reason, message))