[ Avaa Bypassed ]




Upload:

Command:

hmhc3928@52.14.176.111: ~ $
import os
from typing import Tuple, List, Optional, Any, Dict  # NOQA

from vendors_api import CONFIG_PATH
from vendors_api.exceptions import NotFound, NotImplementedByVendor

from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1, PHPDescription
from clcommon.cpapi.cpapiexceptions import NoDBAccessData, NotSupported, NoDomain
from vendors_api.parser import PublicApi

from clcommon.features import ALL_CL_FEATURES, Feature


class PanelPlugin(GeneralPanelPluginV1):
    def __init__(self):
        super().__init__()
        self._api = PublicApi()

    def getCPName(self):
        if not os.path.exists(CONFIG_PATH):
            return None

        return self._api.panel_info().name

    def get_cp_description(self):
        if not os.path.exists(CONFIG_PATH):
            return None

        info = self._api.panel_info()
        return {'name': info.name, 'version': info.version, 'additional_info': None}

    def admin_packages(self, raise_exc=False):
        main_admin = self._api.admins(is_main=True)[0]
        return [pack.name for pack in self._api.packages(owner=main_admin.name)]

    def resellers_packages(self, raise_exc=False):
        result = {}
        main_admin = self._api.admins(is_main=True)[0]
        for pack in self._api.packages():
            if pack.owner == main_admin.name:
                continue

            if pack.owner not in result:
                result[pack.owner] = []
            result[pack.owner].append(pack.name)
        return result

    def get_uids_list_by_package(self, package_name, reseller_name=None):
        main_admin = self._api.admins(is_main=True)[0]
        try:
            users = self._api.users(
                package_name=package_name,
                package_owner=reseller_name or main_admin.name,
                fields=['id']
            )
        except NotFound:
            return []
        return [str(u.id) for u in users]

    def admins(self):
        admins = self._api.admins()
        return [admin.unix_user for admin in admins if admin.unix_user]

    def resellers(self):
        resellers = self._api.resellers()
        return tuple(r.name for r in resellers)

    def is_reseller(self, username):
        return username in self.resellers()

    def db_access(self):
        try:
            db = self._api.db_info().mysql
        except NotImplementedByVendor as e:
            raise NoDBAccessData(f'db_info is not implemented by vendor: `{e}`') from e

        if db is None:
            raise NoDBAccessData('db_access is not supported by this control panel')

        access = {}
        access['login'] = db.access.login
        access['pass'] = db.access.password
        access['host'] = db.access.host
        access['port'] = db.access.port
        return access

    def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
        try:
            db = self._api.db_info().mysql
        except NotImplementedByVendor as e:
            raise NotSupported(f'db_info is not implemented by vendor: `{e}`') from e

        if db is None:
            raise NotSupported('dblogin_cplogin_pairs is not supported by this control panel')

        result = []
        for sys_user, db_users in db.mapping.items():
            if cplogin_lst and sys_user not in cplogin_lst:
                continue

            for db_user in db_users:
                result.append([db_user, sys_user])
        return tuple(result)

    def _convert_by_mapping(self, objects, mapping, keyls):
        def _get_key_nested(token, obj):
            keys = token.split('.')
            for key in keys:
                obj = getattr(obj, key, None)
            return obj

        result = []
        for user in objects:
            as_array = []
            for key in keyls:
                if key not in mapping:
                    value = None
                else:
                    value = _get_key_nested(mapping[key], user)
                as_array.append(value)
            result.append(as_array)

        return result

    def _sys_users_info(self, sys_login, keyls):
        # type: (Optional[str], Tuple[str]) -> List[Tuple]
        mapping = {
            'cplogin': 'username',
            'mail': 'email',
            'reseller': 'owner',
            'dns': 'domain',
            'locale': 'locale_code',
            'package': 'package.name'
        }

        try:
            users = self._api.users(filter_names=sys_login)
        except NotFound:
            return []

        return self._convert_by_mapping(users, mapping, keyls)

    def _resellers_info(self, sys_login, keyls):
        # type: (Optional[str], Tuple[str]) -> List[Tuple]
        mapping = {
            'cplogin': 'name',
            'mail': 'email',
            'locale': 'locale_code',
        }

        try:
            resellers = self._api.resellers(filter_names=sys_login)
        except NotFound:
            resellers = []

        try:
            admins = self._api.admins(filter_names=sys_login)
        except NotFound:
            admins = []

        return self._convert_by_mapping(resellers + admins, mapping, keyls)

    def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns'),
               search_sys_users=True):
        if search_sys_users:
            return self._sys_users_info(cpuser, keyls)
        else:
            return self._resellers_info(cpuser, keyls)

    def list_users(self, raise_exc=False):
        users = self._api.users(fields=['id', 'package', 'owner'])
        result = {}
        for user in users:
            result[user.id] = {
                'package': getattr(user.package, 'name', None),
                'reseller': user.owner
            }
        return result

    def get_reseller_users(self, reseller):
        try:
            users = self._api.users(owner=reseller, fields=['id', 'package', 'owner'])
        except NotFound:
            return {}
        return {
            user.id: {'package': getattr(user.package, 'name', None), 'reseller': user.owner}
            for user in users
        }

    def list_all(self, raise_exc=False):
        users = self._api.users(fields=['id', 'package'])
        return {
            user.id: getattr(user.package, 'name', None)
            for user in users
        }

    def reseller_package_by_uid(self, user_id):
        try:
            user = self._api.users(unix_id=user_id, fields=['owner', 'package'])[0]
        except (NotFound, IndexError):
            return '', ''
        return user.owner, getattr(user.package, 'name', None)

    def get_admin_emails_list(self):
        # see get_admin_email in __init__
        # we do not care how much admins exist
        # in control panel now
        # so just return one
        main_admin = self._api.admins(is_main=True)[0]
        return [main_admin.email]

    def docroot(self, domain):
        try:
            domain = self._api.domains(name=domain)[domain]
        except (NotFound, KeyError) as e:
            raise NoDomain(f"Can't obtain document root for domain '{domain}'") from e
        return domain.document_root, domain.owner

    @staticmethod
    def useraliases(cpuser, domain):
        """
        Return aliases from user domain
        :param str|unicode cpuser: user login
        :param str|unicode domain:
        :return list of aliases
        """
        return []

    def userdomains(self, cpuser):
        try:
            domains = self._api.domains(owner=cpuser)
        except NotFound:
            return []

        result = []
        # main domain must be first
        sorted_domains = sorted(list(domains.items()), key=lambda __d: not __d[1].is_main)
        for domain, info in sorted_domains:
            result.append((domain, info.document_root))
        return result

    def reseller_users(self, resellername=None):
        try:
            return [
                user.username for user in self._api.users(
                    owner=resellername, fields=['username'])
            ]
        except NotFound:
            return []

    def reseller_domains(self, resellername=None):
        try:
            users = self.reseller_users(resellername)
            return dict(self.cpinfo(users, keyls=('cplogin', 'dns')))
        except NotFound:
            return {}

    def get_user_login_url(self, domain):
        url_template = self._api.panel_info().user_login_url
        if url_template is None:
            return url_template
        return url_template.format(domain=domain)

    def get_reseller_id_pairs(self):
        return {r.name: r.id for r in self._api.resellers()}

    def get_admin_locale(self):
        main_admin = self._api.admins(is_main=True)[0]
        return main_admin.locale_code

    def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
        feature_is_enabled_map = self._api.panel_info().supported_cl_features
        # when hoster does not define list of supported features
        # we assume that all of them are supported
        if feature_is_enabled_map is None:
            if self.is_feature_lve_supported():
                return tuple()
            return (Feature.LVE, )

        disabled_features = set(
            feature for feature in ALL_CL_FEATURES
            # we should show only feature that explicitly enabled by hoster
            # that is why we use default False value here
            if not feature_is_enabled_map.get(feature, False)
        )

        # LVE is critical for many other modules, so there is live-check
        if self.is_feature_lve_supported():
            if Feature.LVE in disabled_features:
                disabled_features.remove(Feature.LVE)
        else:
            disabled_features.add(Feature.LVE)

        # to not make all vendors edit theis custom scripts with new parameter
        if Feature.AUTOTRACING in disabled_features and Feature.XRAY not in disabled_features:
            disabled_features.remove(Feature.AUTOTRACING)

        return tuple(disabled_features)

    def is_feature_lve_supported(self):
        from clcommon.lib.cledition import is_container  # pylint: disable=import-outside-toplevel
        from clcommon.lib.cledition import is_cl_solo_edition  # pylint: disable=import-outside-toplevel

        is_binary_exists = os.path.exists('/usr/sbin/lvectl')
        return all([
            is_binary_exists,
            not is_container(),
            not is_cl_solo_edition(skip_jwt_check=True),
        ])

    def get_domains_php_info(self):
        domains = self._api.domains(with_php=True)

        domains_php_info = {}
        for domain, domain_info in domains.items():

            handler_type = 'fpm' if domain_info.php.fpm \
                else 'cgi'

            if domain_info.php.handler:
                handler_type = domain_info.php.handler

            php_version_full = domain_info.php.php_version_id
            if not php_version_full:
                continue

            domains_php_info[domain] = {
                'username': domain_info.owner,
                'php_version_id': php_version_full,
                'handler_type': handler_type,
                'display_version': php_version_full
            }

        return domains_php_info

    def get_installed_php_versions(self):
        phps = self._api.php()

        php_description = []
        for php in phps:
            php_description.append(PHPDescription(
                identifier=php.identifier,
                version=f'{php.identifier[-2]}.{php.identifier[-1]}',
                dir=php.dir,
                modules_dir=php.modules_dir,
                bin=php.bin,
                ini=php.ini,
            ))

        return php_description

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 23 B 0644
backward_plugin.py File 10.88 KB 0644
cpanel.py File 39.04 KB 0644
directadmin.py File 39.79 KB 0644
interworx.py File 3.68 KB 0644
ispmanager.py File 9.64 KB 0644
nopanel.py File 7.73 KB 0644
plesk.py File 34.78 KB 0644
universal.py File 1.86 KB 0644
vendors.py File 11.45 KB 0644