import asyncio import time from contextlib import suppress from datetime import timedelta from logging import getLogger from typing import Union from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSource from defence360agent.subsys.backup_systems import ( get_current_backend, get_last_backup_timestamp, ) from defence360agent.subsys.persistent_state import load_state, save_state from defence360agent.utils import Scope, recurring_check logger = getLogger(__name__) SEND_INTERVAL = int(timedelta(hours=24).total_seconds()) RECURRING_CHECK_INTERVAL = 5 class BackupInfoSender(MessageSource): """Send user backup statistics to CH periodically""" SCOPE = Scope.IM360 async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._send_event = asyncio.Event() self._last_send_timestamp = self.load_last_send_timestamp() self._check_task = self._loop.create_task( self._recurring_check_data_to_send() ) self._send_stat_task = self._loop.create_task( self._recurring_send_stat() ) async def shutdown(self): for task in [self._check_task, self._send_stat_task]: task.cancel() with suppress(asyncio.CancelledError): await task self.save_last_send_timestamp() @staticmethod def is_valid_timestamp(timestamp: Union[int, float]) -> bool: return isinstance(timestamp, (int, float)) and timestamp > 0 def save_last_send_timestamp(self, ts: Union[int, float] = None): timestamp = self._last_send_timestamp if ts is None else ts if not self.is_valid_timestamp(timestamp): logger.warning("Invalid timestamp: %s", timestamp) return save_state("BackupInfoSender", {"last_send_timestamp": timestamp}) def load_last_send_timestamp(self): timestamp = load_state("BackupInfoSender").get("last_send_timestamp") if not self.is_valid_timestamp(timestamp): logger.warning("Invalid timestamp loaded, resetting to 0") timestamp = 0 return timestamp @recurring_check(RECURRING_CHECK_INTERVAL) async def _recurring_check_data_to_send(self): if time.time() - self._last_send_timestamp >= SEND_INTERVAL: self._send_event.set() @recurring_check(0) async def _recurring_send_stat(self): await self._send_event.wait() try: await self._send_server_config() except Exception as e: logger.exception("Failed to collect backup info: %s", e) finally: # Ensure backup info is not sent too frequently, even after an error self._last_send_timestamp = time.time() self._send_event.clear() async def _send_server_config(self): confg_msg = MessageType.BackupInfo( backup_provider_type=get_current_backend(), last_backup_timestamp=await get_last_backup_timestamp(), ) await self._sink.process_message(confg_msg)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
__init__.py | File | 0 B | 0644 |
|
accumulate.py | File | 3.53 KB | 0644 |
|
backup_info_sender.py | File | 3.06 KB | 0644 |
|
check_license.py | File | 7.58 KB | 0644 |
|
checkpoint.py | File | 1.23 KB | 0644 |
|
client.py | File | 9.43 KB | 0644 |
|
config_merger.py | File | 828 B | 0644 |
|
config_watcher.py | File | 1.89 KB | 0644 |
|
event_hook_executor.py | File | 777 B | 0644 |
|
event_monitor.py | File | 3.32 KB | 0644 |
|
event_monitor_message_processor.py | File | 6.33 KB | 0644 |
|
files_recurring_update.py | File | 1.09 KB | 0644 |
|
icontact_sender.py | File | 4.36 KB | 0644 |
|
idle_time_out.py | File | 1.21 KB | 0644 |
|
lve_utils_install.py | File | 1.58 KB | 0644 |
|
myimunify.py | File | 1.43 KB | 0644 |
|
ping.py | File | 536 B | 0644 |
|
send_domain_list.py | File | 2.79 KB | 0644 |
|
send_server_config.py | File | 10.1 KB | 0644 |
|