import asyncio
import itertools
import logging
from typing import List, Optional
from peewee import BooleanField, CharField
import defence360agent.subsys.panels.hosting_panel as hp
from defence360agent.contracts.messages import MessageType
from defence360agent.model import Model, instance
from defence360agent.model.simplification import run_in_executor
from defence360agent.utils import execute_iterable_expression
from defence360agent.utils.config import update_config
from imav.malwarelib.model import MalwareHit
logger = logging.getLogger(__name__)
class MyImunify(Model):
"""Secure-site related settings"""
class Meta:
database = instance.db
db_table = "myimunify"
#: The username of the end-user, or an empty string for the default value
#: for all new users.
user = CharField(unique=True)
#: Is MyImunify protection enabled/disabled to the end-user.
protection = BooleanField(null=False, default=False)
@classmethod
def get_protection(cls, user: Optional[str]) -> bool:
"""Get SecureSite protection by username"""
if user is None or user == "root":
# root
return True
perm, _ = cls.get_or_create(user=user, defaults={"protection": False})
return perm.protection
@classmethod
def update_users_protection(cls, users: List[str], status: bool):
cls.insert_many(
[{"user": user, "protection": status} for user in users]
).on_conflict(
conflict_target=[cls.user],
preserve=[],
update={cls.protection: status},
).execute()
async def malware_cleanup(sink, user):
hits = MalwareHit.malicious_select(user=user, cleanup=True)
if hits:
await sink.process_message(MessageType.MalwareCleanupTask(hits=hits))
async def update_users_protection(sink, users: List[str], status: bool):
await run_in_executor(
None,
lambda: MyImunify.update_users_protection(users, status),
)
proactive_mode = None
if not status:
proactive_mode = "LOG"
tasks = [
update_config(
sink,
{"PROACTIVE_DEFENCE": {"mode": proactive_mode}},
user,
)
for user in users
]
if status:
tasks += [malware_cleanup(sink, user) for user in users]
await asyncio.gather(*tasks)
async def set_protection_status_for_all_users(sink, status: bool):
"""Set protection status for all users"""
panel_users = await hp.HostingPanel().get_users()
await update_users_protection(sink, panel_users, status)
async def sync_users(sink, users: List[str]) -> bool:
"""Synchronize existing permissions with myimunify users"""
panel_users = set(users)
myimunify_users = MyImunify.select(MyImunify.user)
myimunify_users = set(itertools.chain(*myimunify_users.tuples()))
users_to_remove = myimunify_users - panel_users
if users_to_remove:
logger.info("Remove myimunify users %s", users_to_remove)
def expression(users_to_remove):
return MyImunify.delete().where(
MyImunify.user.in_(users_to_remove)
)
execute_iterable_expression(expression, list(users_to_remove))
users_to_add = panel_users - myimunify_users
if users_to_add:
logger.info("Add permissions to users %s", users_to_add)
await update_users_protection(sink, users, False)
return bool(users_to_add) or bool(users_to_remove)