import asyncio
import functools
import logging
from datetime import timezone
from typing import Callable, Dict, List, Optional
from defence360agent.contracts.config import (
ACRONIS,
ANTIVIRUS_MODE,
AcronisBackup as AcronisBackupConfig,
BackupConfig,
BackupRestore,
CLOUDLINUX,
CLOUDLINUX_ON_PREMISE,
CLUSTERLOGICS,
CPANEL,
Core,
DIRECTADMIN,
PLESK,
R1SOFT,
SAMPLE_BACKEND,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.internals.cln import BackupNotFound, RestCLN
from defence360agent.subsys.panels.cpanel.panel import cPanel
from defence360agent.subsys.panels.directadmin.panel import DirectAdmin
from defence360agent.subsys.panels.plesk.panel import Plesk
if not ANTIVIRUS_MODE:
from restore_infected import backup_backends
from restore_infected.backup_backends.acronis import BackupFailed
from restore_infected.backup_backends_lib import (
BackendNonApplicableError,
BackendNotAuthorizedError,
)
logger = logging.getLogger(__name__)
def get_backend(name):
try:
return _get_avalible_backends(include_sample=True)[name]()
except (KeyError, BackendNonApplicableError) as e:
raise ValueError("Backup system is not available: {}".format(name))
def get_available_backends_names() -> List[str]:
names = []
# Don't list the CL Backup as available for selection
for name, cls in _get_avalible_backends(include_cl=False).items():
try:
cls()
except BackendNonApplicableError:
pass
else:
names.append(name)
return names
def _get_avalible_backends(
include_sample=False,
include_cl=True,
) -> Dict[str, Callable]:
backends = {
ACRONIS: Acronis,
R1SOFT: R1Soft,
# https://cloudlinux.atlassian.net/browse/DEF-8806
# CLUSTERLOGICS: ClusterLogics,
}
if BackupRestore.CL_BACKUP_ALLOWED and include_cl:
backends[CLOUDLINUX] = CloudLinux
if BackupRestore.CL_ON_PREMISE_BACKUP_ALLOWED:
backends[CLOUDLINUX_ON_PREMISE] = CloudLinuxOnPremise
if cPanel.is_installed():
backends[CPANEL] = cPanelBackup
elif Plesk.is_installed():
backends[PLESK] = PleskBackup
elif DirectAdmin.is_installed():
backends[DIRECTADMIN] = DirectAdminBackup
if include_sample:
backends[SAMPLE_BACKEND] = Sample
return backends
def get_current_backend() -> Optional[str]:
conf = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {})
return conf.get("enabled") and conf.get("backup_system")
async def get_last_backup_timestamp() -> Optional[int]:
backend = get_current_backend()
if not backend:
return None
backend_instance = get_backend(backend) # type: BackupSystem
return await backend_instance.get_last_backup_timestamp()
def transactional(f):
async def wrapper(cls, *args, **kwargs):
ok = False
try:
rv = await f(cls, *args, **kwargs)
ok = True
finally:
cls._update_backups_config(enabled=ok)
return rv
return wrapper
class BackupException(Exception):
pass
class BackupSystem:
def __init__(self, name, log_path=None):
self.name = name
self.log_path = log_path
def _update_backups_config(self, enabled):
new_conf = {
"BACKUP_SYSTEM": {
"enabled": enabled,
"backup_system": self.name if enabled else None,
}
}
BackupConfig().dict_to_config(new_conf, overwrite=True, validate=True)
async def init(self, *args, **kwargs):
self._update_backups_config(enabled=True)
async def disable(self, delete_backups=False):
self._update_backups_config(enabled=False)
async def check(self):
return {}
async def show(self):
return {}
async def make_backup(self):
pass
async def check_state(self) -> bool:
conf = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {})
return conf.get("enabled") and conf.get("backup_system") == self.name
async def get_last_backup_timestamp(self) -> Optional[int]:
return None
class PleskBackup(BackupSystem):
def __init__(self):
super().__init__(PLESK)
class cPanelBackup(BackupSystem):
def __init__(self):
super().__init__(CPANEL)
class DirectAdminBackup(BackupSystem):
def __init__(self):
super().__init__(DIRECTADMIN)
class R1Soft(BackupSystem):
def __init__(self):
super().__init__(R1SOFT)
self.backend = backup_backends.backend("r1soft", async_=True)
async def show(self) -> dict:
info_data = await self.backend.info()
return {
k: v
for k, v in info_data.items()
if k in ("username", "timestamp", "ip")
}
@transactional
async def init(self, ip, username, password, encryption_key, **kwargs):
await self.backend.init(ip, username, password, encryption_key)
class ClusterLogics(BackupSystem):
def __init__(self):
super().__init__(CLUSTERLOGICS)
self.backend = backup_backends.backend(CLUSTERLOGICS, async_=True)
async def show(self) -> dict:
info_data = await self.backend.info()
return {
k: v
for k, v in info_data.items()
if k in ("username", "url", "apikey")
}
@transactional
async def init(self, **kwargs):
# 'force' argument (for arconis only) has default value
# also, need to use default value for 'url',
# assigned inside backend.init
del kwargs["force"]
await self.backend.init(**kwargs)
class Sample(BackupSystem):
def __init__(self):
super().__init__(SAMPLE_BACKEND)
self.backend = backup_backends.backend(self.name, async_=True)
class Acronis(BackupSystem):
def __init__(self):
super().__init__(
ACRONIS,
"/var/log/%s/%s" % (Core.PRODUCT, AcronisBackupConfig.LOG_NAME),
)
self.backend = backup_backends.backend(self.name, async_=True)
async def show(self) -> dict:
info_data = await self.backend.info()
return {
k: v
for k, v in info_data.items()
if k in ("username", "timestamp")
}
@transactional
async def init(self, username, password, force=False, **kwargs):
provision = not await self.backend.is_agent_installed()
await self.backend.init(
username,
password,
provision=provision,
force=force,
tmp_dir=Core.TMPDIR,
)
async def _list_backups(self, until=None):
return await self.backend.backups(until)
async def get_last_backup_timestamp(self) -> Optional[int]:
backups = await self._list_backups()
if backups:
return int(
max(
backup.created.replace(tzinfo=timezone.utc).timestamp()
for backup in backups
)
)
return None
async def check_state(self) -> bool:
"""if backup exists, than state OK"""
try:
return bool(await self._list_backups())
except (asyncio.CancelledError, BackendNotAuthorizedError):
raise
except Exception:
logger.exception("Error during checking state")
return False
class CloudLinuxBase(Acronis):
async def show(self) -> dict:
info_data = await self.backend.info()
info_data["backup_space_used_bytes"] = info_data.pop("usage")
info_data["login_url"] = await self.backend.login_url()
return info_data
async def make_backup(self):
logger.info("Making backup")
try:
await self.backend.make_initial_backup_strict()
except BackupFailed as e:
logging.exception("CloudLinux backup failed")
raise BackupException(
str(e) if len(e.args) and e.args[0] else "BackupFailed"
)
async def get_backup_progress(self) -> Optional[int]:
return await self.backend.get_backup_progress()
async def init(self, username, password, force=False, **kwargs):
logger.info("Starting %s init" % self.name)
provision = not await self.backend.is_agent_installed()
await self.backend.init(
username,
password,
provision=provision,
force=force,
tmp_dir=Core.TMPDIR,
)
class CloudLinux(CloudLinuxBase):
PAID, UNPAID = "paid", "unpaid"
def __init__(self):
super().__init__()
self.name = CLOUDLINUX
@transactional
async def init(self, force=False, **kwargs):
credentials = await RestCLN.acronis_credentials(
server_id=LicenseCLN.get_server_id()
)
await super().init(
credentials["login"],
credentials["password"],
force=force,
)
class Decorators:
@staticmethod
def update_credentials_on_unauthorized_error(f):
@functools.wraps(f)
async def wrapped(self, *args, **kwargs):
try:
return await f(self, *args, **kwargs)
except BackendNotAuthorizedError:
await self.init(force=True)
return await f(self, *args, **kwargs)
return wrapped
@Decorators.update_credentials_on_unauthorized_error
async def show(self) -> dict:
info_data = await super().show()
# FIXME: raise exception when server_id is None
response = await RestCLN.acronis_check(
server_id=LicenseCLN.get_server_id()
)
purchased_backup_gb = response.get("size", 0)
resize_url = response.get("url", None)
info_data["purchased_backup_gb"] = purchased_backup_gb
info_data["resize_url"] = resize_url
return info_data
@Decorators.update_credentials_on_unauthorized_error
async def make_backup(self):
await super().make_backup()
@Decorators.update_credentials_on_unauthorized_error
async def get_backup_progress(self) -> Optional[int]:
return await super().get_backup_progress()
@Decorators.update_credentials_on_unauthorized_error
async def get_last_backup_timestamp(self) -> Optional[int]:
return await super().get_last_backup_timestamp()
@Decorators.update_credentials_on_unauthorized_error
async def check_state(self) -> bool:
return await super().check_state()
async def check(self) -> dict:
try:
content = await RestCLN.acronis_check(
server_id=LicenseCLN.get_server_id()
)
except BackupNotFound as e:
return {"status": self.UNPAID, "url": e.add_used_space()}
return {"status": self.PAID, "size": content.get("size")}
async def disable(self, delete_backups=False):
await super().disable()
if delete_backups:
await RestCLN.acronis_remove(server_id=LicenseCLN.get_server_id())
class CloudLinuxOnPremise(CloudLinuxBase):
def __init__(self):
super().__init__()
self.name = CLOUDLINUX_ON_PREMISE
@transactional
async def init(self, *args, **kwargs):
await super().init(*args, **kwargs)