"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import logging
import time
from defence360agent.contracts.config import BackupConfig
from defence360agent.contracts.license import LicenseCLN
from defence360agent.internals.cln import CLNError
from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import CommonEndpoints, bind, wraps
from defence360agent.subsys.backup_systems import (
Acronis,
BackupException,
CloudLinux,
CloudLinuxBase,
CloudLinuxOnPremise,
R1Soft,
get_available_backends_names,
get_backend,
)
from defence360agent.utils import Scope, Singleton
logger = logging.getLogger(__name__)
CLN_RESPONSE_ERROR = """
Error with one of the next reasons:
* Not found linked backup for the server
* Not found linked cloudlinux user for the server
* IP of server is different in last time
"""
PLAIN_AUTH_FIELDS = {
"username": "Username should be provided",
"password": "Password should be provided",
}
REQUIRED_FIELDS = {
Acronis: PLAIN_AUTH_FIELDS,
R1Soft: {
"encryption_key": "Encryption key should be provided",
"ip": "IP should be provided",
**PLAIN_AUTH_FIELDS,
},
CloudLinuxOnPremise: PLAIN_AUTH_FIELDS,
}
def validate_backend_args(f):
@wraps(f)
async def wrapper(*args, **kwargs):
required_options = REQUIRED_FIELDS.get(kwargs["backend"], {})
errors = [
error_msg
for field, error_msg in required_options.items()
if not kwargs.get(field)
]
if errors:
raise ValidationError("\n".join(errors))
return await f(*args, **kwargs)
return wrapper
class BackupEndpoints(CommonEndpoints, metaclass=Singleton):
SCOPE = Scope.IM360
NOT_RUNNING, INIT, BACKUP, DONE = "not_running", "init", "backup", "done"
def __init__(self, sink):
super().__init__(sink)
self._status = self.NOT_RUNNING
self._current_backend = self._error = None
self._init_task = self._backup_task = None
self._backup_started_time = 0
@property
def _backup_pending(self):
return time.time() - self._backup_started_time < 60 * 5
@validate_backend_args
@bind("backup-systems", "init")
async def init(self, backend, **kwargs):
if LicenseCLN.is_demo():
raise ValidationError("This action is not allowed in demo version")
loop = asyncio.get_event_loop()
if self._current_backend and not self._error:
if self._status == self.INIT:
raise ValidationError(
"Backup initialization is already in progress"
)
if self._status == self.BACKUP:
raise ValidationError("Backup process is already in progress")
logger.info("Starting init task")
# flush error from previous sessions
self._error = None
self._task = loop.create_task(backend.init(**kwargs))
self._task.add_done_callback(self._init_task_done)
self._status = self.INIT
self._current_backend = backend
return "Backup initialization process is in progress"
async def _set_current_backend(self, conf):
if not self._current_backend and conf["backup_system"]:
self._current_backend = get_backend(conf["backup_system"])
if self._status == self.NOT_RUNNING:
# if backup exists, assuming all done
if await self._current_backend.check_state():
self._status = self.DONE
else:
self._error = "No backups found!"
async def _include_init_stages_info(self, status):
if status["state"] != self.INIT:
advanced_data = await self._current_backend.show()
status.update(advanced_data)
if (
isinstance(self._current_backend, CloudLinuxBase)
and self._status == self.BACKUP
):
if self._error is not None:
if await self._current_backend.check_state():
# error should already be fixed,
# because there is some backups
self._error = None
status["state"] = self._status = self.DONE
else:
progress = await self._current_backend.get_backup_progress()
if progress is None:
if self._backup_pending:
status["progress"] = 0
else:
# if there is no progress after a while
# let's admit backup is completed
status["state"] = self._status = self.DONE
else:
status["progress"] = progress
return status
async def _get_advanced_status(self, status):
await self._set_current_backend(status)
status["state"] = self._status
status["error"] = self._error
status["log_path"] = getattr(self._current_backend, "log_path", None)
status["backup_system"] = getattr(self._current_backend, "name", None)
if self._error is not None and self._status == self.INIT:
self._error = self._current_backend = None
self._status = self.NOT_RUNNING
if self._current_backend:
try:
status = await self._include_init_stages_info(status)
except asyncio.CancelledError:
raise
except Exception as e:
# Ignoring "No backup for host" errors for 5 minutes
if "No backup for host" in str(e) and self._backup_pending:
logger.info(
"Error %s will be ignored for 5 minutes "
"after init state has been finished",
e,
)
# want to show human friendly error if backup is unpaid
elif self._current_backend == CloudLinux:
resp = await self._current_backend.check()
if resp.get("status") == CloudLinux.UNPAID:
status["error"] = (
"Backup is unpaid! Please, check "
"it out in your CLN account."
)
else:
raise e
return status
@bind("backup-systems", "extended-status")
async def extended_status(self):
status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {})
status = await self._get_advanced_status(status)
return {"items": status}
@bind("backup-systems", "status")
async def status(self, user=None):
status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {})
return {"items": status}
@bind("backup-systems", "list")
async def list(self):
return get_available_backends_names()
@bind("backup-systems", "disable")
async def disable(self, backend, **kwargs):
self._current_backend = self._error = None
self._status = self.NOT_RUNNING
await backend.disable(**kwargs)
@bind("backup-systems", "check")
async def check(self, backend):
try:
return {"items": await backend.check()}
except CLNError:
raise ValidationError(CLN_RESPONSE_ERROR)
except (LookupError, RuntimeError) as e:
raise ValidationError(str(e)) from e
def _init_task_done(self, future):
logger.info("In init done callback")
e = future.exception()
if e is not None:
if isinstance(e, CLNError):
self._error = CLN_RESPONSE_ERROR
else:
logger.exception("Backup init task failed", exc_info=e)
self._error = str(e)
else:
logger.info("Starting initial backup task")
self._status = self.BACKUP
self._backup_started_time = time.time()
self._backup_task = asyncio.get_event_loop().create_task(
self._current_backend.make_backup()
)
self._backup_task.add_done_callback(self._backup_task_done)
def _backup_task_done(self, future):
logger.info("In backup done callback")
e = future.exception()
if e is not None:
logger.exception("Initial backup task failed", exc_info=e)
if isinstance(e, BackupException):
self._error = str(e)
else:
self._error = "Unknown error: " + str(e)
else:
self._status = self.DONE