"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import os
import time
from contextlib import suppress
from datetime import datetime, timedelta
from logging import getLogger
from pathlib import Path
from imav.malwarelib.config import (
MalwareHitStatus,
MalwareScanResourceType,
MalwareScanType,
)
from imav.malwarelib.cleanup.cleaner import MalwareCleanupProxy
from imav.malwarelib.model import MalwareHistory, MalwareHit, MalwareScan
from imav.subsys import realtime_av
from defence360agent.api import inactivity
from defence360agent.contracts.messages import MessageType
from defence360agent.internals.global_scope import g
from defence360agent.mr_proper import BaseCleaner
from defence360agent.subsys import persistent_state
from defence360agent.utils import nice_iterator, split_for_chunk
logger = getLogger(__name__)
class OutdatedScansCleaner(BaseCleaner):
PERIOD = timedelta(days=1).total_seconds()
CLEANUP_LIMIT_DELTA = timedelta(days=30)
@classmethod
async def cleanup(cls) -> None:
cleanup_time_limit = int(
(datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp()
)
deleted = (
MalwareScan.delete()
.where(MalwareScan.started < cleanup_time_limit)
.execute()
)
logger.info("Cleaned %s outdated scans", deleted)
class OutdatedHistoryCleaner(BaseCleaner):
PERIOD = timedelta(days=1).total_seconds()
CLEANUP_LIMIT_DELTA = timedelta(days=30)
@classmethod
async def cleanup(cls) -> None:
keep_time_threshold = int(
(datetime.now() - cls.CLEANUP_LIMIT_DELTA).timestamp()
)
deleted = (
MalwareHistory.delete()
.where(MalwareHistory.ctime < keep_time_threshold)
.execute()
)
logger.info("Cleaned %s outdated malware history", deleted)
class OutdatedHitsCleaner(BaseCleaner):
PERIOD = int(
os.environ.get(
"IMUNIFY360_OUTDATED_HITS_CHECK_INTERVAL",
timedelta(days=1).total_seconds(),
)
)
REALTIME_SCAN_THRESHOLD = timedelta(minutes=10).total_seconds()
CHUNK_SIZE = 1000
@classmethod
async def _cleanup(cls) -> None:
"""Rescan irrelevant malware hits"""
to_rescan = []
not_exist_hits = []
hits = (
MalwareHit.select()
.where(
MalwareHit.status == MalwareHitStatus.FOUND,
MalwareHit.resource_type == MalwareScanResourceType.FILE.value,
)
.order_by(MalwareHit.timestamp.asc())
)
async for hit in nice_iterator(hits, chunk_size=cls.CHUNK_SIZE):
orig_file_path = Path(hit.orig_file)
try:
file_ctime = orig_file_path.stat().st_ctime
if hit.timestamp < file_ctime:
# rescan the modified files after scanning,
# they may not be infected anymore
realtime_threshold = (
time.time() - cls.REALTIME_SCAN_THRESHOLD
) # don't scan file twice
if (
not realtime_av.should_be_running()
or file_ctime < realtime_threshold
):
to_rescan.append(hit.orig_file)
except FileNotFoundError:
not_exist_hits.append(hit.id)
except OSError as exc:
logger.warning("Can't check file due to %s", exc)
if to_rescan:
for files in split_for_chunk(to_rescan, chunk_size=cls.CHUNK_SIZE):
logger.info("Rescan %s outdated malware files", len(files))
await g.sink.process_message(
MessageType.MalwareRescanFiles(
files=files, type=MalwareScanType.RESCAN_OUTDATED
)
)
# delete db entries for non-existent files
for hits_to_delete in split_for_chunk(
not_exist_hits, chunk_size=cls.CHUNK_SIZE
):
deleted = (
MalwareHit.delete()
.where(MalwareHit.id.in_(hits_to_delete))
.where(MalwareHit.status == MalwareHitStatus.FOUND)
.execute()
)
logger.info("Deleted %s not exist malware hits", deleted)
# don't block the whole loop for too long,
# return control to the loop every iteration
await asyncio.sleep(0)
@classmethod
async def cleanup(cls):
last_clean_timestamp = persistent_state.load_state(
"OutdatedHitsCleaner"
).get("last_clean_timestamp", 0)
if (
not last_clean_timestamp
or (time.time() - last_clean_timestamp) >= cls.PERIOD
): # don't run it too often
with inactivity.track.task("malware hits relevance check"):
try:
await cls._cleanup()
finally:
persistent_state.save_state(
"OutdatedHitsCleaner",
{"last_clean_timestamp": time.time()},
)