import asyncio
import functools
import json
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Set, Sequence
import cerberus
import yaml
from defence360agent.contracts.config import NoCP as Config
from defence360agent.utils import get_non_system_users, run
from .. import base
logger = logging.getLogger(__name__)
_SCHEMA_PATH_TMPL = (
"/opt/imunify360/venv/share/imunify360/no_cp/schema-v{}.yaml"
)
@functools.lru_cache(maxsize=2)
def _get_validator(version: int) -> cerberus.Validator:
"""Returns a validator for given API version."""
with open(_SCHEMA_PATH_TMPL.format(version)) as schema_file:
schema = yaml.safe_load(schema_file)
return cerberus.Validator(schema)
async def _get_client_data() -> Optional[dict]:
"""Runs a script, validates its JSON output and returns it."""
try:
result = await run([Config.CLIENT_SCRIPT, str(Config.LATEST_VERSION)])
except FileNotFoundError:
return None
except asyncio.CancelledError:
raise
except Exception as exc:
raise ScriptError("failed to run script") from exc
code, stdout, _ = result
if code != 0:
raise ScriptError("exited with code: {}".format(code))
try:
data = json.loads(stdout.decode())
except Exception as exc:
raise ScriptError("Cannot decode output as JSON") from exc
version = data.get("version")
if version is None:
raise ScriptError("output does not have`version` field")
if not (
isinstance(version, int)
and version >= 1
and version <= NoCP.LATEST_VERSION
):
raise ScriptError("invalid API version: {}".format(version))
validator = _get_validator(version)
ok = validator.validate(data)
if not ok:
raise ScriptError("validation error: {}", validator.errors)
return data
class ScriptError(base.PanelException):
pass
class NoCP(base.AbstractPanel):
NAME = "no panel"
exception = ScriptError
@classmethod
def is_installed(cls):
return False
async def enable_imunify360_plugin(self, name=None):
pass
async def disable_imunify360_plugin(self, plugin_name=None):
pass
@classmethod
async def version(cls):
return "0"
async def get_user_domains(self):
data = await _get_client_data()
if data is None:
return []
return [domain["name"] for domain in data["domains"]]
async def get_users(self) -> List[str]:
return [pw.pw_name for pw in get_non_system_users()]
async def get_domain_to_owner(self) -> Dict[str, List[str]]:
data = await _get_client_data()
if data is None:
return {}
return {
domain["name"]: [domain["owner"]] for domain in data["domains"]
}
async def get_domains_per_user(self) -> Dict[str, List[str]]:
data = await _get_client_data()
if data is None:
return {}
user_to_domains = defaultdict(list) # type: Dict[str, List[str]]
for domain in data["domains"]:
user_to_domains[domain["owner"]].append(domain["name"])
return dict(user_to_domains)
def basedirs(self) -> Set[str]:
return set()
async def list_docroots(self) -> Dict[str, str]: # pragma: no cover
return dict()