import asyncio
import json
import logging
import os
import socket
import urllib.error
import urllib.parse
import urllib.request
from collections import defaultdict
from functools import lru_cache
from pathlib import Path
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
import psutil
from defence360agent.contracts.config import ANTIVIRUS_MODE
from defence360agent.contracts.license import LicenseCLN
from defence360agent.utils import CheckRunError, async_lru_cache, check_run
from defence360agent.utils.common import get_hostname
_TIMEOUT = 300 # timeout for network operations
_IMUNIFY_EMAIL_CONFIG_EXECUTABLE = Path("/usr/sbin/ie-config")
logger = logging.getLogger(__name__)
IE_SUPPORTED_CMD = (
"wget -qq -O -"
" https://repo.imunify360.cloudlinux.com/defence360/imunifyemail-deploy.sh"
" | bash -s 'is-supported'"
)
@async_lru_cache(maxsize=1)
async def is_imunify_email_supported() -> bool:
try:
await check_run(IE_SUPPORTED_CMD, shell=True)
except CheckRunError as e:
if e.returncode != 100:
logger.error(f"imunify-email check failed {str(e)}")
return False
return True
async def get_imunify_email_status():
"""Try to get imunify-email status"""
if ANTIVIRUS_MODE:
return False
if not _IMUNIFY_EMAIL_CONFIG_EXECUTABLE.exists():
return False
try:
output = await check_run(
[str(_IMUNIFY_EMAIL_CONFIG_EXECUTABLE), "status"]
)
except CheckRunError:
return False
return "spamfilter exim configuration: enabled" in output.decode()
class CLNError(Exception):
def __init__(self, status=None, message=None):
self.message = message
self.status = status
def __str__(self):
if self.message:
return self.message
return "Unexpected status code from CLN: {}".format(self.status)
class InvalidLicenseError(Exception):
pass
class BackupNotFound(CLNError):
GB = 1024 * 1024 * 1024
def __init__(self, url):
self.url = url
def __str__(self):
return "Backup not found in CLN"
def add_used_space(self):
if self.url is None:
return
pu = urlparse(self.url)
query = dict(parse_qsl(pu.query))
query["used_space"] = self._disk_usage()
return urlunparse(
(
pu.scheme,
pu.netloc,
pu.path,
pu.params,
urlencode(query),
pu.fragment,
)
)
def _disk_usage(self):
total_used = 0
partitions = psutil.disk_partitions()
processed = set()
for p in partitions:
if (
(p.device not in processed)
and ("noauto" not in p.opts)
and (not p.device.startswith("/dev/loop"))
):
total_used += psutil.disk_usage(p.mountpoint).used
processed.add(p.device)
return round(total_used / self.GB)
def _post_request(url, data=None, headers=None, timeout=None):
"""To be used by RestCLN._request()."""
kwargs = {}
if headers is not None:
kwargs["headers"] = headers
if data is not None:
if isinstance(data, bytes):
kwargs.setdefault(
"headers", {"Content-type": "application/octet-stream"}
)
elif isinstance(data, str):
data = data.encode("utf-8")
kwargs.setdefault(
"headers", {"Content-type": "text/plain; charset=utf-8"}
)
else: # dict
data = urllib.parse.urlencode(data).encode("ascii")
kwargs.setdefault(
"headers",
{"Content-type": "application/x-www-form-urlencoded"},
)
kwargs["data"] = data
try:
resp = urllib.request.urlopen(
urllib.request.Request(url, **kwargs), timeout=timeout
)
except socket.timeout:
raise TimeoutError("Timed out receiving response")
except OSError as e:
if hasattr(e, "code"): # HTTPError
if e.code < 400:
raise CLNError(e.code) from e
# e.code >= 400
message = None
if e.fp is not None:
logger.warning(
"CLN.post(url=%r, data=%r, headers=%r): %d %s",
url,
data,
headers,
e.code,
e.reason,
)
try:
resp_data = e.read()
except socket.timeout:
raise TimeoutError("Timed out reading error message")
# the response may be non-json
message = resp_data.decode(errors="replace")
raise CLNError(message=message, status=e.code) from e
else:
logger.warning(
"CLN.post(url=%r, data=%r, headers=%r, timeout=%r): %s",
url,
data,
headers,
timeout,
e,
)
raise
else:
with resp:
if resp.code == 204:
return resp.code, None
elif resp.code in (200, 244):
# 244 - /im/ab/check returns link for backup buy page
try:
content = resp.read()
except socket.timeout:
raise TimeoutError("Timed out reading response")
else:
try:
return resp.code, json.loads(content.decode())
except json.JSONDecodeError as e:
raise CLNError(
message=(
f"Non-json data from CLN: {content} for"
f" code={resp.code}"
),
status=resp.code,
) from e
else:
raise CLNError(resp.code)
class RestCLN:
_BASE_URL = os.environ.get(
"IM360_CLN_API_BASE_URL", "https://cln.cloudlinux.com/api/im/"
)
_REGISTER_URL = urljoin(_BASE_URL, "register")
_UNREGISTER_URL = urljoin(_BASE_URL, "unregister")
_CHECKIN_URL = urljoin(_BASE_URL, "checkin")
_ACRONIS_CREDENTIALS_URL = urljoin(_BASE_URL, "ab/credentials")
_ACRONIS_REMOVE_URL = urljoin(_BASE_URL, "ab/remove")
_ACRONIS_CHECK_URL = urljoin(_BASE_URL, "ab/check")
STATUS_OK_PAID_LICENSE = "ok"
STATUS_OK_TRIAL_LICENSE = "ok-trial"
@classmethod
async def _request(cls, url, *, data=None, headers=None, timeout=_TIMEOUT):
return await asyncio.get_event_loop().run_in_executor(
None, _post_request, url, data, headers, timeout
)
@classmethod
async def register(cls, key: str) -> dict:
"""
Register server with key
:param key: registration key
:return: license token in case of success
"""
_, token = await cls._request(cls._REGISTER_URL, data={"key": key})
return token
@classmethod
async def checkin(
cls,
server_id: str,
users_count: int,
hostname: str = None,
):
"""
Update license token
:param str server_id: server id
:param int users_count: users count
:param str hostname: current server hostname
:return: dict new license token
"""
hostname = hostname or get_hostname()
imunify_email_status = await get_imunify_email_status()
req = {
"id": server_id,
"hostname": hostname,
"im": {
"users": users_count,
"imunifyEmail": imunify_email_status,
"supported_features": {
"IM_EMAIL": await is_imunify_email_supported(),
},
},
}
data = json.dumps(req)
logger.info("CLN checkin: %s", data)
_, token = await cls._request(
cls._CHECKIN_URL,
data=data,
headers={"Content-type": "application/json"},
)
return token
@classmethod
async def acronis_credentials(cls, server_id: str) -> dict:
"""
Creates Acronis Backup account and get user & password
:param server_id: server id
"""
_, creds = await cls._request(
cls._ACRONIS_CREDENTIALS_URL, data={"id": server_id}
)
return creds
@classmethod
async def acronis_remove(cls, server_id: str):
"""
Removes Acronis Backup account
:param server_id: server id
"""
await cls._request(cls._ACRONIS_REMOVE_URL, data={"id": server_id})
@classmethod
async def acronis_check(cls, server_id: str) -> dict:
"""
If Acronis account exists return backup size in GB or if backups
not exists URL for backups
:param server_id: server id
"""
status, response = await cls._request(
cls._ACRONIS_CHECK_URL, data={"id": server_id}
)
if status == 244: # Backup not found
raise BackupNotFound(url=None) # Prohibit purchasing a new backup
return response
@classmethod
async def unregister(cls, server_id=None):
"""
Unregister server id
:return: None
"""
server_id = server_id or LicenseCLN.get_server_id()
await cls._request(cls._UNREGISTER_URL, data={"id": server_id})
class CLN:
_CALLBACKS = defaultdict(set)
@classmethod
def add_callback_for(cls, method_name, coro_callback):
cls._CALLBACKS[method_name].add(coro_callback)
@classmethod
async def run_callbacks_for(cls, method_name):
for callback in cls._CALLBACKS[method_name]:
try:
await callback()
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(
"Error '{!r}' happened when run callback {} for"
"CLN {} method".format(e, callback, method_name)
)
@classmethod
def is_avp_key(cls, key):
return key.startswith("IMAVP")
@classmethod
async def register(cls, key):
if cls.is_avp_key(key) and not ANTIVIRUS_MODE:
raise InvalidLicenseError(
"Imunify360 can not be registered with ImunifyAV+ key"
)
license = await RestCLN.register(key)
# in case of IP license, we have to register to know if license is
# valid for server (i.e. Imunify360 license is used for Imunify360)
if not LicenseCLN.is_valid(license):
# release registered server id
await RestCLN.unregister(license["id"])
raise InvalidLicenseError("License is invalid for this server")
LicenseCLN.update(license)
await cls.run_callbacks_for("register")
@classmethod
async def unregister(cls):
await RestCLN.unregister()
LicenseCLN.delete()
await cls.run_callbacks_for("unregister")
@classmethod
async def refresh_token(cls, token):
"""Refreshes token and returns new one on success, None otherwise"""
if LicenseCLN.is_free():
# noop: free license can not be refreshed
return LicenseCLN.get_token()
if LicenseCLN.get_token().get("is_alternative"):
# self-signed licenses are refreshed by customer
return LicenseCLN.get_token()
new_token = await RestCLN.checkin(token["id"], LicenseCLN.users_count)
logger.info("Got new token from CLN: %s", new_token)
if new_token is None:
await CLN.unregister()
else:
LicenseCLN.update(new_token)
await cls.run_callbacks_for("refresh_token")
return LicenseCLN.get_token()
def subscribe_to_license_changes(coro):
for method_name in ["register", "unregister", "refresh_token"]:
CLN.add_callback_for(method_name, coro_callback=coro)