"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
from __future__ import annotations
import pwd
from dataclasses import dataclass
from pathlib import Path
from typing import Collection, Iterable, Set, Tuple
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils.threads import to_thread
from imav.malwarelib.utils.cloudways import CloudwaysUser
@dataclass(eq=True, frozen=True)
class MalwareDatabaseHitError:
code: int
message: str
@dataclass(eq=True, frozen=True)
class MalwareDatabaseHitInfo:
scan_id: str
path: str
signature: str
app_name: str
db_host: str
db_name: str
db_port: int
errors: Tuple[MalwareDatabaseHitError, ...]
owner: int
user: int
table_name: str = None
table_field: str = None
table_row_inf: int = None
snippet: str = None
def _get_db_info(self):
return (
self.path,
self.app_name,
self.db_host,
self.db_port,
self.db_name,
)
@classmethod
def get_hits_per_db(
cls, hits: Iterable[MalwareDatabaseHitInfo]
) -> set[MalwareDatabaseHitInfo]:
db_hits = {}
for hit in hits:
db_info = hit._get_db_info()
if db_info not in db_hits:
db_hits[db_info] = hit
return set(db_hits.values())
@classmethod
def from_report(
cls, report, users_from_panel, pw_all, scan_id
) -> MalwareDatabaseHitInfo:
signature = "BAD URL"
snippet = None
detailed_reports = report.get("detailed_reports") or report.get(
"detailed_urls_reports"
)
if detailed_reports:
detailed_report = detailed_reports[0]
signature = detailed_report["sigid"]
snippet = detailed_report.get("snpt")
path: str = report["path"]
owner: int = report["app_owner_uid"]
user: int = CloudwaysUser.override_uid_by_path(
Path(path), owner, users_from_panel, pw_all
)
return cls(
scan_id=scan_id,
path=path,
signature=signature,
app_name=report["app"],
db_host=report["database_host"],
db_name=report["database_name"],
errors=tuple(
MalwareDatabaseHitError(err["code"], err["message"])
for err in report["error_list"]
),
db_port=report["database_port"],
owner=owner,
user=user,
snippet=snippet,
)
@classmethod
def _get_hits_from_report(
cls, data: dict, **kwargs
) -> Iterable[MalwareDatabaseHitInfo]:
for table in data.get("tables", []):
for field in table.get("fields", []):
for row_id in field.get("row_ids", []):
yield cls(
signature=data["sigid"],
table_name=table["table"],
table_field=field["field"],
table_row_inf=int(row_id),
snippet=data.get("snpt"),
**kwargs,
)
@classmethod
def iter_from_scan_report(
cls, report, users_from_panel, pw_all, scan_id
) -> Iterable[MalwareDatabaseHitInfo]:
# create a separate hit for each scanned row
path: str = report["path"]
owner: int = report["app_owner_uid"]
user: int = CloudwaysUser.override_uid_by_path(
Path(path), owner, users_from_panel, pw_all
)
kwargs = {
"scan_id": scan_id,
"path": path,
"app_name": report["app"],
"db_host": report["database_host"],
"db_name": report["database_name"],
"errors": tuple(
MalwareDatabaseHitError(err["code"], err["message"])
for err in report["error_list"]
),
"db_port": report["database_port"],
"owner": owner,
"user": user,
}
if detailed_reports := report.get("detailed_reports"):
for detailed_report in detailed_reports:
yield from cls._get_hits_from_report(detailed_report, **kwargs)
else:
for detailed_report in report.get("detailed_urls_reports", []):
yield from cls._get_hits_from_report(detailed_report, **kwargs)
@dataclass(eq=True, frozen=True)
class MalwareDatabaseScanReport:
hits: Set[MalwareDatabaseHitInfo]
started: int
completed: int
total_resources: int
total_malicious: int
def _last_completed_time(reports: Iterable[dict]) -> int:
return int(
max(
(
report["start_time"] + report["running_time"]
for report in reports
),
default=0,
)
)
def _first_started_time(reports: Iterable[dict]) -> int:
return int(min((report["start_time"] for report in reports), default=0))
def _found(reports: Iterable[dict]) -> Iterable[dict]:
return iter(filter(lambda r: r["app"] is not None, reports))
def _malicious(reports: Iterable[dict]) -> Iterable[dict]:
return iter(
filter(
lambda r: r["count_of_detected_malicious_entries"] > 0,
reports,
)
)
def _total_scanned_rows(reports: Iterable[dict]) -> int:
return sum(report.get("rows_count", 0) for report in reports)
async def scan_report(
hit_report_list: Collection[dict], scan_id: str
) -> MalwareDatabaseScanReport:
users_from_panel = set(await hosting_panel.HostingPanel().get_users())
pw_all = await to_thread(pwd.getpwall)
hits = set()
for report in _malicious(_found(hit_report_list)):
hits |= set(
MalwareDatabaseHitInfo.iter_from_scan_report(
report, users_from_panel, pw_all, scan_id
)
)
started = _first_started_time(hit_report_list)
completed = _last_completed_time(hit_report_list)
total_resources = _total_scanned_rows(hit_report_list)
total_malicious = len(hits)
return MalwareDatabaseScanReport(
hits, started, completed, total_resources, total_malicious
)