"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import json
import logging
import os
from contextlib import suppress
from functools import partial
from itertools import chain
from typing import Callable
from peewee import SqliteDatabase, TextField
from defence360agent.contracts.config import LocalConfig
from imav.migration_utils.other import im360_present
from imav.migration_utils.plesk_sdk import PleskSdk
from imav.migration_utils.revisium import find_revisium_db
logger = logging.getLogger(__name__)
db = SqliteDatabase(None)
class KeyValue(db.Model):
"""ex-Revisium settings database model"""
key = TextField()
value = TextField()
class Meta:
primary_key = False
class SettingsExtractor:
"""Base class for ex-Revisium and Plesk settings extractors"""
def __init__(self, key, default):
self.key = key
self.default = default
def _get(self):
raise NotImplementedError
def get(self):
try:
value = self._get()
except Exception as e:
logger.warning("Fail to get %r value: %r", self.key, e)
return self.default
with suppress(Exception):
value = json.loads(value)
return value
class Revisium(SettingsExtractor):
"""ex-Revisium database settings extractor"""
def _get(self):
return KeyValue.get(KeyValue.key == self.key).value
class Plesk(SettingsExtractor):
"""Plesk settings extractor using PHP wrapper"""
def _get(self):
return PleskSdk.settings__get(self.key, self.default)
class ConfigMapping:
"""
Transform a single or multiple dependent ex-Revisium settings parameters
to a single ImunifyAV/360 config value and map it to a single or multiple
config keys
"""
def __init__(
self,
source: SettingsExtractor | tuple[SettingsExtractor, ...],
target: str | tuple[str, ...],
*,
converter: Callable,
):
"""
:param source: settings extractor(s) for ex-Revisium / Plesk
:param target: name(s) of target config parameter(s) for ImunifyAV/360
:param converter: a callable to convert source value(s) to a target one
"""
if not isinstance(source, tuple):
source = (source,)
if not isinstance(target, tuple):
target = (target,)
self.source = source
self.target = target
self.converter = converter
def convert(self):
"""Get target value from source value(s) and assign it to target(s)"""
source_values = [source.get() for source in self.source]
try:
target_value = self.converter(*source_values)
except Exception as e:
logger.warning(
"Fail to convert %r value(s) (%r): %r",
self.source,
source_values,
e,
)
return ()
return tuple(zip(self.target, (target_value,) * len(self.target)))
def clamp(minimum: int, maximum: int, value: int) -> int:
"""
Ensure that a value is within limits
"""
return max(minimum, min(value, maximum))
def intensity_cpu(value, max_value) -> int:
"""
Calculate ImunifyAV/360 CPU intensity based on a value in a range
"""
intensity = 4 # the middle point of the CPU intensity
with suppress(ZeroDivisionError):
intensity = intensity * value // max_value
return clamp(2, 7, intensity)
schedule_interval_mapping = {
"never": "none",
"daily": "day",
"weekly": "week",
"monthly": "month",
}
def get_max_possible_cpu() -> int:
"""A half of available CPUs/cores (at least one)"""
return max((os.cpu_count() or 1) // 2, 1)
# we agreed to have all default settings in IM360 except for scanning time
scan_time_mapping_only = (
ConfigMapping(
Plesk("ra_auto_scan_period", "monthly"),
"MALWARE_SCAN_SCHEDULE.interval",
converter=schedule_interval_mapping.__getitem__,
),
ConfigMapping(
Plesk("ra_hour_auto_scan", 4),
"MALWARE_SCAN_SCHEDULE.hour",
converter=partial(clamp, 0, 23),
),
)
all_mappings = (
ConfigMapping(
(
Plesk("ra_max_worker_count", 2),
Plesk("ra_max_possible_worker_count", get_max_possible_cpu()),
),
(
"MALWARE_SCAN_INTENSITY.cpu",
"MALWARE_SCAN_INTENSITY.user_scan_cpu",
),
converter=intensity_cpu,
),
ConfigMapping(
Plesk("ra_keep_backups_days", 7),
"MALWARE_CLEANUP.keep_original_files_days",
converter=partial(max, 1),
),
ConfigMapping(
Plesk("ra_trim_files", True),
"MALWARE_CLEANUP.trim_file_instead_of_removal",
converter=bool,
),
ConfigMapping(
Revisium("ra_use_ignore_list_by_user", True),
"PERMISSIONS.user_ignore_list",
converter=bool,
),
) + scan_time_mapping_only
def migrate_imav4plesk_settings(database=None):
db.init(database)
# convert strings like `MALWARE_SCAN_INTENSITY.cpu` to dicts and join them
config = {}
mappings = scan_time_mapping_only if im360_present() else all_mappings
for target, value in chain.from_iterable(m.convert() for m in mappings):
d = config
*path, param = target.split(".")
for key in path:
d = d.setdefault(key, {})
d[param] = value
return config
def migrate(migrator, database, fake=False, **kwargs):
if fake:
return
revisium_db_path = find_revisium_db()
if revisium_db_path is None:
logger.info("No legacy ImunifyAV database found. Skipping...")
return
try:
if migrated_config := migrate_imav4plesk_settings(revisium_db_path):
LocalConfig().dict_to_config(migrated_config, normalize=False)
except Exception as e:
logger.warning("Failed to migrate ImunifyAV for Plesk settings: %r", e)
def rollback(migrator, database, fake=False, **kwargs):
pass
if __name__ == "__main__":
print(migrate_imav4plesk_settings(find_revisium_db()))