import os
from typing import Tuple, List, Optional, Any, Dict # NOQA
from vendors_api import CONFIG_PATH
from vendors_api.exceptions import NotFound, NotImplementedByVendor
from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1, PHPDescription
from clcommon.cpapi.cpapiexceptions import NoDBAccessData, NotSupported, NoDomain
from vendors_api.parser import PublicApi
from clcommon.features import ALL_CL_FEATURES, Feature
class PanelPlugin(GeneralPanelPluginV1):
def __init__(self):
super().__init__()
self._api = PublicApi()
def getCPName(self):
if not os.path.exists(CONFIG_PATH):
return None
return self._api.panel_info().name
def get_cp_description(self):
if not os.path.exists(CONFIG_PATH):
return None
info = self._api.panel_info()
return {'name': info.name, 'version': info.version, 'additional_info': None}
def admin_packages(self, raise_exc=False):
main_admin = self._api.admins(is_main=True)[0]
return [pack.name for pack in self._api.packages(owner=main_admin.name)]
def resellers_packages(self, raise_exc=False):
result = {}
main_admin = self._api.admins(is_main=True)[0]
for pack in self._api.packages():
if pack.owner == main_admin.name:
continue
if pack.owner not in result:
result[pack.owner] = []
result[pack.owner].append(pack.name)
return result
def get_uids_list_by_package(self, package_name, reseller_name=None):
main_admin = self._api.admins(is_main=True)[0]
try:
users = self._api.users(
package_name=package_name,
package_owner=reseller_name or main_admin.name,
fields=['id']
)
except NotFound:
return []
return [str(u.id) for u in users]
def admins(self):
admins = self._api.admins()
return [admin.unix_user for admin in admins if admin.unix_user]
def resellers(self):
resellers = self._api.resellers()
return tuple(r.name for r in resellers)
def is_reseller(self, username):
return username in self.resellers()
def db_access(self):
try:
db = self._api.db_info().mysql
except NotImplementedByVendor as e:
raise NoDBAccessData(f'db_info is not implemented by vendor: `{e}`') from e
if db is None:
raise NoDBAccessData('db_access is not supported by this control panel')
access = {}
access['login'] = db.access.login
access['pass'] = db.access.password
access['host'] = db.access.host
access['port'] = db.access.port
return access
def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
try:
db = self._api.db_info().mysql
except NotImplementedByVendor as e:
raise NotSupported(f'db_info is not implemented by vendor: `{e}`') from e
if db is None:
raise NotSupported('dblogin_cplogin_pairs is not supported by this control panel')
result = []
for sys_user, db_users in db.mapping.items():
if cplogin_lst and sys_user not in cplogin_lst:
continue
for db_user in db_users:
result.append([db_user, sys_user])
return tuple(result)
def _convert_by_mapping(self, objects, mapping, keyls):
def _get_key_nested(token, obj):
keys = token.split('.')
for key in keys:
obj = getattr(obj, key, None)
return obj
result = []
for user in objects:
as_array = []
for key in keyls:
if key not in mapping:
value = None
else:
value = _get_key_nested(mapping[key], user)
as_array.append(value)
result.append(as_array)
return result
def _sys_users_info(self, sys_login, keyls):
# type: (Optional[str], Tuple[str]) -> List[Tuple]
mapping = {
'cplogin': 'username',
'mail': 'email',
'reseller': 'owner',
'dns': 'domain',
'locale': 'locale_code',
'package': 'package.name'
}
try:
users = self._api.users(filter_names=sys_login)
except NotFound:
return []
return self._convert_by_mapping(users, mapping, keyls)
def _resellers_info(self, sys_login, keyls):
# type: (Optional[str], Tuple[str]) -> List[Tuple]
mapping = {
'cplogin': 'name',
'mail': 'email',
'locale': 'locale_code',
}
try:
resellers = self._api.resellers(filter_names=sys_login)
except NotFound:
resellers = []
try:
admins = self._api.admins(filter_names=sys_login)
except NotFound:
admins = []
return self._convert_by_mapping(resellers + admins, mapping, keyls)
def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns'),
search_sys_users=True):
if search_sys_users:
return self._sys_users_info(cpuser, keyls)
else:
return self._resellers_info(cpuser, keyls)
def list_users(self, raise_exc=False):
users = self._api.users(fields=['id', 'package', 'owner'])
result = {}
for user in users:
result[user.id] = {
'package': getattr(user.package, 'name', None),
'reseller': user.owner
}
return result
def get_reseller_users(self, reseller):
try:
users = self._api.users(owner=reseller, fields=['id', 'package', 'owner'])
except NotFound:
return {}
return {
user.id: {'package': getattr(user.package, 'name', None), 'reseller': user.owner}
for user in users
}
def list_all(self, raise_exc=False):
users = self._api.users(fields=['id', 'package'])
return {
user.id: getattr(user.package, 'name', None)
for user in users
}
def reseller_package_by_uid(self, user_id):
try:
user = self._api.users(unix_id=user_id, fields=['owner', 'package'])[0]
except (NotFound, IndexError):
return '', ''
return user.owner, getattr(user.package, 'name', None)
def get_admin_emails_list(self):
# see get_admin_email in __init__
# we do not care how much admins exist
# in control panel now
# so just return one
main_admin = self._api.admins(is_main=True)[0]
return [main_admin.email]
def docroot(self, domain):
try:
domain = self._api.domains(name=domain)[domain]
except (NotFound, KeyError) as e:
raise NoDomain(f"Can't obtain document root for domain '{domain}'") from e
return domain.document_root, domain.owner
@staticmethod
def useraliases(cpuser, domain):
"""
Return aliases from user domain
:param str|unicode cpuser: user login
:param str|unicode domain:
:return list of aliases
"""
return []
def userdomains(self, cpuser):
try:
domains = self._api.domains(owner=cpuser)
except NotFound:
return []
result = []
# main domain must be first
sorted_domains = sorted(list(domains.items()), key=lambda __d: not __d[1].is_main)
for domain, info in sorted_domains:
result.append((domain, info.document_root))
return result
def reseller_users(self, resellername=None):
try:
return [
user.username for user in self._api.users(
owner=resellername, fields=['username'])
]
except NotFound:
return []
def reseller_domains(self, resellername=None):
try:
users = self.reseller_users(resellername)
return dict(self.cpinfo(users, keyls=('cplogin', 'dns')))
except NotFound:
return {}
def get_user_login_url(self, domain):
url_template = self._api.panel_info().user_login_url
if url_template is None:
return url_template
return url_template.format(domain=domain)
def get_reseller_id_pairs(self):
return {r.name: r.id for r in self._api.resellers()}
def get_admin_locale(self):
main_admin = self._api.admins(is_main=True)[0]
return main_admin.locale_code
def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
feature_is_enabled_map = self._api.panel_info().supported_cl_features
# when hoster does not define list of supported features
# we assume that all of them are supported
if feature_is_enabled_map is None:
if self.is_feature_lve_supported():
return tuple()
return (Feature.LVE, )
disabled_features = set(
feature for feature in ALL_CL_FEATURES
# we should show only feature that explicitly enabled by hoster
# that is why we use default False value here
if not feature_is_enabled_map.get(feature, False)
)
# LVE is critical for many other modules, so there is live-check
if self.is_feature_lve_supported():
if Feature.LVE in disabled_features:
disabled_features.remove(Feature.LVE)
else:
disabled_features.add(Feature.LVE)
# to not make all vendors edit theis custom scripts with new parameter
if Feature.AUTOTRACING in disabled_features and Feature.XRAY not in disabled_features:
disabled_features.remove(Feature.AUTOTRACING)
return tuple(disabled_features)
def is_feature_lve_supported(self):
from clcommon.lib.cledition import is_container # pylint: disable=import-outside-toplevel
from clcommon.lib.cledition import is_cl_solo_edition # pylint: disable=import-outside-toplevel
is_binary_exists = os.path.exists('/usr/sbin/lvectl')
return all([
is_binary_exists,
not is_container(),
not is_cl_solo_edition(skip_jwt_check=True),
])
def get_domains_php_info(self):
domains = self._api.domains(with_php=True)
domains_php_info = {}
for domain, domain_info in domains.items():
handler_type = 'fpm' if domain_info.php.fpm \
else 'cgi'
if domain_info.php.handler:
handler_type = domain_info.php.handler
php_version_full = domain_info.php.php_version_id
if not php_version_full:
continue
domains_php_info[domain] = {
'username': domain_info.owner,
'php_version_id': php_version_full,
'handler_type': handler_type,
'display_version': php_version_full
}
return domains_php_info
def get_installed_php_versions(self):
phps = self._api.php()
php_description = []
for php in phps:
php_description.append(PHPDescription(
identifier=php.identifier,
version=f'{php.identifier[-2]}.{php.identifier[-1]}',
dir=php.dir,
modules_dir=php.modules_dir,
bin=php.bin,
ini=php.ini,
))
return php_description