import datetime
import json
from contextlib import suppress
from imav.malwarelib.model import MalwareHit
from defence360agent.contracts.license import LicenseCLN
from defence360agent.rpc_tools.lookup import RootEndpoints, bind
from defence360agent.rpc_tools.utils import run_in_executor_decorator
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.subsys.panels.plesk.api import list_docroots_domains_users
from defence360agent.utils import atomic_rewrite, Scope
from defence360agent.subsys.panels.plesk import Plesk
from defence360agent.subsys.features import kernel_care
class PleskStatsEndpoints(RootEndpoints):
MAX_DOMAINS_COUNT = 100
@bind("plesk-stats")
async def plesk_stats(self):
panel = HostingPanel()
assert isinstance(panel, Plesk), "only for plesk"
current_timestamp = int(round(datetime.datetime.now().timestamp()))
last_modified_str = str(
datetime.datetime.fromtimestamp(
current_timestamp,
datetime.timezone.utc,
)
)
domains_stats = await self._domains_stats(
await list_docroots_domains_users(),
)
return {
"items": {
"last_modified": current_timestamp * 1000,
"last_modified_str": last_modified_str,
**domains_stats,
**(await self._get_stats_field_in_plugin_info()),
"license": (1 if LicenseCLN.is_valid() else 0),
}
}
@classmethod
async def _get_stats_field_in_plugin_info(cls):
if not await kernel_care.KernelCare().check_installed():
return {}
plugin_info = await kernel_care.KernelCare().get_plugin_info()
previous = {
"effective_kernel": None,
"first_time_update_available": None,
}
with suppress(FileNotFoundError):
with open(kernel_care.KernelCare.KC_PROPERTIES) as file:
previous = json.load(file)
update_available = plugin_info["updateCode"] == "1"
first_time_update_available = (
datetime.datetime.now(tz=datetime.timezone.utc)
if plugin_info["effectiveKernel"] != previous["effective_kernel"]
else datetime.datetime.fromtimestamp(
previous["first_time_update_available"], datetime.timezone.utc
)
)
outdated_since_days = (
0
if not update_available
else (
datetime.datetime.now(tz=datetime.timezone.utc)
- first_time_update_available
).days
)
atomic_rewrite(
kernel_care.KernelCare.KC_PROPERTIES,
json.dumps(
{
"effective_kernel": plugin_info["effectiveKernel"],
"first_time_update_available": first_time_update_available.timestamp(), # noqa
}
),
backup=False,
)
return {
"kernel_uptodate": plugin_info["autoUpdate"],
"outdated_since_days": outdated_since_days,
}
@run_in_executor_decorator
def _domains_stats(self, plesk_response):
file_names = list(
MalwareHit.select(MalwareHit.orig_file)
.where(MalwareHit.is_infected())
.tuples()
)
# usually infected_users << total_users (according to ch)
# so we can compare each hit only with docroots, whose owners are
# marked as infected in imunify database
infected_users = set(
data[0]
for data in list(
MalwareHit.select(MalwareHit.user)
.where(MalwareHit.is_infected())
.distinct()
.tuples()
)
)
infected_plesk_response = list(
filter(
lambda data: data[2] in infected_users,
plesk_response,
)
)
infected_sites = []
for docroot, domain, user in infected_plesk_response:
for (filename,) in file_names:
if filename.startswith(docroot):
infected_sites.append(domain)
break
return {
"infected_sites": infected_sites[: self.MAX_DOMAINS_COUNT],
"wsites_infected": len(infected_sites),
}