[ Avaa Bypassed ]




Upload:

Command:

hmhc3928@18.191.176.82: ~ $
import pwd
import uuid
from pathlib import Path
from typing import Dict, List, Optional

from defence360agent.contracts.permissions import logger
from defence360agent.model import instance
from defence360agent.myimunify.model import MyImunify, update_users_protection
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import safe_fileops

MYIMUNIFY_ID_FILE_NAME = ".myimunify_id"


class MyImunifyIdError(Exception):
    """Exception representing issues related to MyImunify id"""


async def add_myimunify_user(
    sink, user: str, protection: bool
) -> Optional[str]:
    """Save subscription type to the DB and generate id file"""

    myimunify, _ = MyImunify.get_or_create(user=user)
    myimunify.save()
    await update_users_protection(sink, [user], protection)
    logger.info("Applied setting MyImunify=%s for user %s", protection, user)

    try:
        myimunify_id = await _get_or_generate_id(user)
    except MyImunifyIdError:
        # User no longer exists
        return None

    return myimunify_id


async def get_myimunify_users() -> List[Dict]:
    """
    Get a list of MyImunify users, their subscription types and unique ids
    """

    users = []
    user_details = await HostingPanel().get_user_details()
    myimunify_user_to_id = await _myimunify_user_to_id()
    with instance.db.transaction():
        for user, myimunify_uid in sorted(myimunify_user_to_id.items()):
            record, _ = MyImunify.get_or_create(user=user)
            users.append(
                {
                    "email": user_details.get(user, {}).get("email", ""),
                    "username": user,
                    "myimunify_id": myimunify_uid,
                    "protection": record.protection,
                    "locale": user_details.get(user, {}).get("locale", ""),
                }
            )
    return users


async def _myimunify_user_to_id() -> Dict[str, str]:
    """Get a list of users and their MyImunify ids"""

    user_to_id = {}
    for user in await HostingPanel().get_users():
        try:
            user_to_id[user] = await _get_or_generate_id(user)
        except MyImunifyIdError:
            # User does not exist
            continue
        except safe_fileops.UnsafeFileOperation as e:
            logger.error(
                "Unable to generate id for user=%s, error=%s", user, str(e)
            )
            continue
    return user_to_id


async def _get_or_generate_id(user: str) -> str:
    """
    Read MyImunify id if exists or generate a new one and write into the file
    """
    id_file = await _get_myimunify_id_file(user)
    try:
        return _read_id(id_file)
    except (FileNotFoundError, MyImunifyIdError):
        myimunify_id = uuid.uuid1().hex
        return await _write_id(myimunify_id, id_file)


async def _write_id(myimunify_id: str, id_file: Path) -> str:
    """Write MyImunify id to file"""
    text = (
        "# DO NOT EDIT\n"
        "# This file contains MyImunify id unique to this user\n"
        "\n"
        f"{myimunify_id}\n"
    )
    try:
        await safe_fileops.write_text(str(id_file), text)
    except (OSError, PermissionError) as e:
        logger.error("Unable to write myimunify_id in user home dir: %s", e)
        raise MyImunifyIdError from e
    return myimunify_id


def _read_id(id_file: Path) -> str:
    """Read MyImunify id from file"""

    with id_file.open("r") as f:
        for line in reversed(f.readlines()):
            if line and not line.startswith("#"):
                if myimunify_id := line.strip():
                    return myimunify_id

    raise MyImunifyIdError


async def _get_myimunify_id_file(user: str) -> Path:
    """Get a file with MyImunify id and create it if does not exist"""

    try:
        user_pwd = pwd.getpwnam(user)
    except KeyError as e:
        logger.error("No such user: %s", user)
        raise MyImunifyIdError from e
    else:
        id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME
        if not id_file.exists():
            if not id_file.parent.exists():
                logger.error("No such user homedir: %s", user)
                raise MyImunifyIdError
            try:
                await safe_fileops.touch(str(id_file))
            except (PermissionError, OSError) as e:
                logger.error(
                    "Unable to put myimunify_id in user home dir: %s", e
                )
                raise MyImunifyIdError from e
    return id_file

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 0 B 0644
config.py File 41.43 KB 0644
config_provider.py File 9.36 KB 0644
eula.py File 1.47 KB 0644
hook_events.py File 1.51 KB 0644
hooks.py File 6.14 KB 0644
license.py File 18.02 KB 0644
messages.py File 11.94 KB 0644
myimunify_id.py File 4.4 KB 0644
permissions.py File 4.04 KB 0644
plugins.py File 8.1 KB 0644
sentry.py File 2.08 KB 0644