# -*- coding: utf-8 -*-
import json
import logging
import os
import pwd
import subprocess
import sys
import urllib.request
import urllib.parse
import urllib.error
from configparser import ConfigParser, NoOptionError, NoSectionError
from collections import OrderedDict
import requests
from traceback import format_exc
from clcommon.cpapi.pluginlib import getuser
from urllib.parse import urlparse
from clcommon import ClPwd
from clcommon.cpapi.cpapiexceptions import DuplicateData, CPAPIExternalProgramFailed, ParsingError, \
EncodingError, NotSupported
from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError, WhmNoPhpBinariesError
from clcommon.utils import run_command, ExternalProgramFailed, grep, get_file_lines
from clcommon.clconfpars import load as loadconfig
from clcommon.cpapi.cpapiexceptions import NoDBAccessData, CpApiTypeError, NoDomain
from clcommon.cpapi.GeneralPanel import (
GeneralPanelPluginV1,
CPAPI_CACHE_STORAGE,
PHPDescription,
DomainDescription
)
from clcommon.clconfpars import load_fast
__cpname__ = 'cPanel'
DBMAPPING_SCRIPT = os.path.join(os.path.dirname(sys.executable), "cpanel-dbmapping")
UAPI = '/usr/bin/uapi'
logger = logging.getLogger(__name__)
# WARN: Probably will be deprecated for our "official" plugins.
# See pluginlib.detect_panel_fast()
def detect():
return os.path.isfile('/usr/local/cpanel/cpanel')
CPANEL_DB_CONF = '/root/.my.cnf'
CPANEL_USERPLANS_PATH = '/etc/userplans'
CPANEL_DATABASES_PATH = '/var/cpanel/databases/'
CPANEL_USERS_DIR = '/var/cpanel/users/'
CPANEL_RESELLERS_PATH = '/var/cpanel/resellers'
CPANEL_USERDATADOMAINS_PATH = '/etc/userdatadomains;/var/cpanel/userdata/{user}/cache'
CPANEL_USERDATAFOLDER_PATH = '/var/cpanel/userdata/{user}'
CPANEL_ACCT_CONF_PATH = '/etc/wwwacct.conf'
CPANEL_USEROWNERS_FILE = '/etc/trueuserowners'
SYSCONF_CLOUDLINUX_PATH = '/etc/sysconfig/cloudlinux'
CPANEL_CONFIG = '/var/cpanel/cpanel.config'
USERCONF_PARAM_MAP = {
'dns': 'dns',
'package': 'plan',
'reseller': 'owner',
'mail': 'contactemail',
'locale': 'locale',
'cplogin': 'user'
}
SUPPORTED_CPANEL_CPINFO = ('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale')
def db_access(_conf_path=CPANEL_DB_CONF):
access = {}
reg_data_config = ConfigParser(
allow_no_value=True,
interpolation=None,
strict=False
)
opened_files = reg_data_config.read(_conf_path)
if not opened_files:
raise NoDBAccessData(
f"Cannot find database access data for localhost. No such file {_conf_path}"
)
# Options in MySQL config files can be double- or single-quoted, so we strip() them
try:
if reg_data_config.has_option(section="client", option="password"):
access["pass"] = reg_data_config.get(
section="client",
option="password",
raw=True
).strip("\"'")
else:
access["pass"] = reg_data_config.get(
section="client",
option="pass",
raw=True
).strip("\"'")
access["login"] = reg_data_config.get(
section="client",
option="user",
raw=True
).strip("\"'")
except (NoOptionError, NoSectionError) as err:
raise NoDBAccessData(
"Cannot find database access data for localhost from config "
f"file {_conf_path}; {err.message}"
) from err
access["db"] = "mysql"
return access
def cpusers(_userplans_file=CPANEL_USERPLANS_PATH):
"""
Parse the file /etc/userplans, which contains the pairs of user-plan
:param _userplans_file: path to the user's plans file
:return: list of the non-system users
"""
with open(_userplans_file, encoding='utf-8') as stream:
users_list = [line.split(':')[0].strip() for line in stream
if not line.startswith('#') and line.count(':') == 1 and len(line.strip()) > 3]
return tuple(users_list)
def resellers(_resellers_path=CPANEL_RESELLERS_PATH):
if not os.path.isfile(_resellers_path): # on a clean system, this file may not be
return tuple()
with open(_resellers_path, encoding='utf-8') as stream:
# Example of file
# res1res1:add-pkg,add-pkg-ip,add-pkg-shell
# res1root:add-pkg,add-pkg-ip,add-pkg-shell,allow-addoncreate,allow-emaillimits-pkgs
# r:
resellers_list = [line.split(':', 1)[0].strip() for line in stream
if not line.startswith('#') and (':' in line) and len(line.strip()) > 1]
return tuple(resellers_list)
def admins():
return {'root'}
def is_reseller(username, _resellers_path=CPANEL_RESELLERS_PATH):
"""
Check if given user is reseller;
:param _resellers_path: for testing only
:type username: str
:rtype: bool
"""
return any(cplogin == username for cplogin in resellers(_resellers_path))
def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False):
"""
Get mapping between system and DB users
@param cplogin_lst :list: list with usernames for generate mapping
@param with_system_users :bool: add system users to result list or no.
default: False
"""
# initialize results list
results = []
# accept only list and tuple parameters
uid_list = []
for username in (cplogin_lst or []):
try:
uid_list.append(str(pwd.getpwnam(username).pw_uid))
except KeyError:
# no user exists - skip it
uid_list.append("-1")
# generate system command
params = [DBMAPPING_SCRIPT]
if not with_system_users:
params.append("--nosys")
params += uid_list
with subprocess.Popen(
params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) as p:
output, _ = p.communicate()
# output format: "DBuser user UID"
for line in output.split("\n"):
line = line.strip()
if line:
results.append(line.split()[:2])
return tuple(results)
def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
_cpanel_users_dir=CPANEL_USERS_DIR, quiet=True, search_sys_users=True):
returned = []
if isinstance(cpuser, str):
cpusers_list = [cpuser]
elif isinstance(cpuser, (list, tuple)):
cpusers_list = tuple(cpuser)
elif cpuser is None:
cpusers_list = cpusers(_userplans_file=CPANEL_USERPLANS_PATH)
else:
raise CpApiTypeError(funcname='cpinfo', supportedtypes='str|unicode|list|tuple',
received_type=type(cpuser).__name__)
for cpuser in cpusers_list:
user_config_path = os.path.join(_cpanel_users_dir, cpuser)
if not os.path.exists(user_config_path):
if not quiet:
sys.stderr.write(f'WARNING: Can not load data to the user "{cpuser}"; '
'Perhaps there is no such user in cPanel')
continue
# ignore bad symbols in config here
# because of LVEMAN-1150 (which also seems
# being already fixed by cpanel)
cpuser_data = loadconfig(user_config_path, ignore_bad_encoding=True)
user_data = []
for data_key in keyls:
data = cpuser_data.get(USERCONF_PARAM_MAP.get(data_key))
# USERCONF_PARAM_MAP.get('cplogin') == user
# check if user tag in user config and if tag is missing - use file name as user name
if data_key == 'cplogin' and data is None:
data = os.path.basename(user_config_path)
user_data.append(data)
returned.append(tuple(user_data))
if 'mail' in keyls: # checking the presence of an additional e-mail
additional_mail = cpuser_data.get('contactemail2')
if additional_mail:
user_data[list(keyls).index('mail')] = additional_mail
user_data_tuple = tuple(user_data)
if user_data_tuple not in returned:
returned.append(tuple(user_data))
return tuple(returned)
def get_admin_email(_conf1=None, _conf2=None, _hostname=None):
"""
:param str|None _conf1: for testing
:param str|None _conf2: for testing
:param str|None _hostname: for testing
:return:
"""
# 1. Try to get admin email from /etc/sysconfig/cloudlinux
lines = []
try:
lines = get_file_lines(_conf1 or SYSCONF_CLOUDLINUX_PATH)
except (OSError, IOError):
pass
for line in lines:
if line.startswith('EMAIL'):
parts = line.split('=')
if len(parts) == 2 and '@' in parts[1].strip():
return parts[1].strip()
# 2. Try to get admin email from /etc/wwwacct.conf
lines = []
try:
lines = get_file_lines(_conf2 or CPANEL_ACCT_CONF_PATH)
except (OSError, IOError):
pass
host = ''
for line in lines:
if line.startswith('CONTACTEMAIL'):
s = line.replace('CONTACTEMAIL', '').strip()
if s:
return s
if line.startswith('HOST'):
s = line.replace('HOST', '').strip()
if s:
host = s
if host:
return 'root@' + host
# Admin email not found in system files, use common address
from clcommon.cpapi.plugins.universal import get_admin_email # pylint: disable=import-outside-toplevel
return get_admin_email(_hostname=_hostname)
def _parse_userdatadomains(_path, parser, quiet=True):
if '{user}' in _path:
call_as_user = pwd.getpwuid(os.geteuid()).pw_name
_path = _path.replace('{user}', call_as_user)
path_list = _path.split(';')
for path_ in path_list:
if not os.path.exists(path_):
continue
try:
with open(path_, encoding='utf-8') as stream:
# example line:
# test.russianguns.ru: russianguns==root==sub==
# russianguns.ru==/home/russianguns/fla==192.168.122.40:80======0
for i, line in enumerate(stream):
if not line.strip(): # ignore the empty string
continue
if line.count(': ') != 1:
if not quiet:
sys.stderr.write(f'Can\'t parse {i} line in file "{path_}"; line was ignored\n')
continue
domain, domain_raw_data = line.split(': ')
domain_data = domain_raw_data.strip().split('==')
parser(path_, domain, domain_data)
except IOError as e:
if not quiet:
sys.stderr.write(f"Can't open file {path_} [{e}]\n")
continue
def _parse_userdataaliases(_path, quiet=True):
path_list = _path.split(';')
aliases = []
for path_ in path_list:
if not os.path.exists(path_):
continue
try:
with open(path_, encoding='utf-8') as stream:
# example line:
# test.russianguns.ru: russianguns==root==sub==
# russianguns.ru==/home/russianguns/fla==192.168.122.40:80======0
for i, line in enumerate(stream):
if not line.strip(): # ignore the empty string
continue
if "serveralias" not in line:
continue
aliases += line.replace("serveralias: ", '').strip().split(' ')
except IOError as e:
if not quiet:
sys.stderr.write(f"Can't open file {path_} [{e}]\n")
continue
return aliases
def useraliases(cpuser, domain, _path=CPANEL_USERDATAFOLDER_PATH, quiet=True):
# use dict to avoid duplicates
if '{user}' in _path:
_path = _path.replace('{user}', cpuser)
_path = os.path.join(_path, domain)
aliases = _parse_userdataaliases(_path, quiet=quiet)
return list(aliases)
def docroot(domain, _path=CPANEL_USERDATADOMAINS_PATH, quiet=True):
domain = domain.strip()
pathes = set()
result = {'docroot_path': None, 'user': None}
def parser(path, d, domain_data):
pathes.add(path)
if d == domain:
result['docroot_path'] = domain_data[4]
result['user'] = domain_data[0]
_parse_userdatadomains(_path, parser, quiet=quiet)
if not (result['docroot_path'] is None or result['user'] is None):
return result['docroot_path'], result['user']
watched = '; '.join(
['Can\'t find record "%(d)s" in file "%(p)s"' % {'d': domain, 'p': p} for p in pathes]
)
raise NoDomain(f"Can't obtain document root for domain '{domain}'; {watched}")
# User name to domain cache
# Example:
# { 'user1': [('user1.org', '/home/user1/public_html'),
# ('mk.user1.com.user1.org', '/home/user1/public_html/www.mk.user1.com')] }
_user_to_domains_map_cpanel = {}
def userdomains(cpuser, _path=CPANEL_USERDATADOMAINS_PATH, quiet=True):
# If user present in cache, take data from it
if cpuser in _user_to_domains_map_cpanel:
return _user_to_domains_map_cpanel[cpuser]
# use dict to avoid duplicates
domains_tmp = OrderedDict()
domains = OrderedDict()
def parser(path, d, domain_data):
user_ = domain_data[0]
document_root = domain_data[4]
# Update cache
if user_ in _user_to_domains_map_cpanel:
user_data = _user_to_domains_map_cpanel[user_]
else:
user_data = []
if 'main' == domain_data[2]:
# insert main domain to 1st position in list
if (d, document_root) not in user_data:
user_data.insert(0, (d, document_root))
else:
if (d, document_root) not in user_data:
user_data.append((d, document_root))
_user_to_domains_map_cpanel[user_] = user_data
if user_ == cpuser:
if 'main' == domain_data[2]:
# main domain must be first in list
domains.update({d: document_root})
else:
domains_tmp.update({d: document_root})
_parse_userdatadomains(_path, parser, quiet=quiet)
domains.update(domains_tmp)
return list(domains.items())
def domain_owner(domain, _path=CPANEL_USERDATADOMAINS_PATH, quiet=True):
users_list = []
def parser(path, d, domain_data):
if d == domain:
users_list.append(domain_data[0])
_parse_userdatadomains(_path, parser, quiet=quiet)
if len(users_list) > 1:
raise DuplicateData(
f"domain {domain} belongs to few users: [{','.join(users_list)}]"
)
if len(users_list) == 0:
return None
return users_list[0]
def homedirs(_sysusers=None, _conf_path = CPANEL_ACCT_CONF_PATH):
"""
Detects and returns list of folders contained the home dirs of users of the cPanel
:param str|None _sysusers: for testing
:param str|None _conf_path: for testing
:return: list of folders, which are parent of home dirs of users of the panel
"""
HOMEDIR = 'HOMEDIR '
HOMEMATCH = 'HOMEMATCH '
homedirs = []
users_homedir = ''
users_home_match = ''
if os.path.exists(_conf_path):
lines = get_file_lines(_conf_path)
for line in lines:
if line.startswith(HOMEDIR):
users_homedir = line.split(HOMEDIR)[1].strip()
elif line.startswith(HOMEMATCH):
users_home_match = line.split(HOMEMATCH)[1].strip()
if users_homedir:
homedirs.append(users_homedir)
clpwd = ClPwd()
users_dict = clpwd.get_user_dict()
# for testing only
if isinstance(_sysusers, (list, tuple)):
class pw:
def __init__(self, name, dir):
self.pw_name = name
self.pw_dir = dir
users_dict = {}
for (name,dir) in _sysusers:
users_dict[name] = pw(name, dir)
for user_data in users_dict.values():
userdir = user_data.pw_dir
if os.path.exists(userdir + '/public_html') or os.path.exists(userdir + '/www'):
homedir = os.path.dirname(userdir)
if users_home_match and homedir.find('/'+users_home_match) == -1:
continue
if homedir not in homedirs:
homedirs.append(homedir)
return homedirs
def _reseller_users_parser(json_string):
try:
json_serialized = json.loads(json_string)
result = json_serialized['result']
return [item['user'] for item in result['data']]
except (KeyError, ValueError, TypeError) as e:
raise ParsingError(str(e)) from e
def _reseller_users_json(reseller_name=None):
"""
Call UAPI and get json string;
:type reseller_name: str | None
:raises: ParsingError, CPAPIExternalProgramFailed
:rtype: str
"""
reseller_name = reseller_name or getuser()
# Attention!! /usr/bin/uapi utility may works unstable. See PTCLLIB-95 for details
cmd = [UAPI, 'Resellers', 'list_accounts', '--output=json']
# root user MUST specify --user, and reseller CAN'T do that
if reseller_name != getuser() or getuser() == "root":
cmd.append(f'--user={urllib.parse.quote(reseller_name)}')
try:
json_string = run_command(cmd=cmd, return_full_output=True)[1] # take only std_out, ignore std_err
except ExternalProgramFailed as e:
raise CPAPIExternalProgramFailed(str(e)) from e
return json_string
def reseller_users(resellername):
"""
Return reseller users
:param resellername: reseller name; return empty list if None
:return list[str]: user names list
"""
# Attention!! /usr/bin/uapi utility may works unstable. See PTCLLIB-95 for details
# json_string = _reseller_users_json(resellername)
# return _reseller_users_parser(json_string)
# So we read reseller's user list from /etc/trueuserowners
# /etc/trueuserowners example:
# #userowners v1
# cltest1: root
# res: res
# res1: root
# res2: root
# res2user1: res2
# res2usr1: res2
# resnew: resnew
# resnew1: resnew1
# rn1user1: resnew1
if resellername is None:
return []
result = []
userowner_file_data = get_file_lines(CPANEL_USEROWNERS_FILE)
if resellername is not None:
for line in grep(rf'\: {resellername}$',
fixed_string=False,
match_any_position=True,
multiple_search=True,
data_from_file=userowner_file_data):
splitted_line = line.strip().split(': ')
result.append(splitted_line[0])
return result
def _reseller_user_domains_parser(json_string):
try:
json_serialized = json.loads(json_string)
result = json_serialized['result']
users_data = {}
for item in result['data']:
users_data[item['user']] = item['domain']
return users_data
except (KeyError, ValueError, TypeError) as e:
raise ParsingError(str(e)) from e
def reseller_domains(reseller_name=None):
"""
Get dict[user, domain]
Attention!! This function may work unstable. See PTCLLIB-95 for details.
:param reseller_name: reseller's name
:rtype: dict[str, str|None]
:raises DomainException: if cannot obtain domains
"""
json_string = _reseller_users_json(reseller_name)
return _reseller_user_domains_parser(json_string)
def get_user_login_url(domain):
return f'http://{domain}:2083'
def is_no_php_binaries_on_cpanel():
"""
Checks that there are no installed php binaries
only for cpanel
"""
try:
WhmApiRequest('php_get_installed_versions').call()
except WhmNoPhpBinariesError:
return True
return False
class PanelPlugin(GeneralPanelPluginV1):
DEFAULT_LOCALE = 'en'
BAD_CODING_ERROR_CODE = 48
HTTPD_CONFIG_FILE = '/etc/apache2/conf/httpd.conf'
def __init__(self):
super().__init__()
def invalidate_cpapi_cache(self):
"""
Goes through all panel caches and invalidates it if needed
"""
method_marker_pairs = (('_get_php_version_id_to_handler_map', ['/etc/cpanel/ea4/php.conf']),
('_get_vhosts_php_versions', ['/etc/userdatadomains', '/etc/cpanel/ea4/php.conf']))
for pair in method_marker_pairs:
method, markers = pair[0], pair[1]
cache_file = os.path.join(CPAPI_CACHE_STORAGE, method + '.cache')
if self.is_cache_valid(cache_file, markers):
# cache is up to dated -> nothing to do
continue
# cache is outdated -> rewrite
data = getattr(self, method)()
self.rewrite_cpapi_cache(data, cache_file)
def _run_long_script(self, args):
"""
Processes decoding errors from long script which mean
that cpanel wrote something bad to config file (most likely LVEMAN-1150)
:param args: arguments to pass
:return: stdout, stderr
"""
with subprocess.Popen(
[self._custom_script_name] + args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as p:
out, err = p.communicate()
returncode = p.returncode
if returncode == self.BAD_CODING_ERROR_CODE:
raise EncodingError(
"Problem with encoding in %(script)s file, error is: '%(error)s'.",
script=self._custom_script_name, error=err
)
return out, err, returncode
def getCPName(self):
"""
Return panel name
:return:
"""
return __cpname__
def get_cp_description(self):
"""
Retrieve panel name and it's version
:return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
or None if can't get info
"""
try:
with subprocess.Popen(
['/usr/local/cpanel/cpanel', '-V'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as p:
out, _ = p.communicate()
return {'name': __cpname__, 'version': out.split()[0], 'additional_info': None}
except Exception:
return None
def db_access(self):
"""
Getting root access to mysql database.
For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}
:return: root access to mysql database
:rtype: dict
:raises: NoDBAccessData
"""
return db_access()
def cpusers(self):
"""
Generates a list of cpusers registered in the control panel
:return: list of cpusers registered in the control panel
:rtype: tuple
"""
return cpusers()
def resellers(self):
"""
Generates a list of resellers in the control panel
:return: list of cpusers registered in the control panel
:rtype: tuple
"""
return resellers()
def is_reseller(self, username):
"""
Check if given user is reseller;
:type username: str
:rtype: bool
"""
return is_reseller(username)
def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
"""
Get mapping between system and DB users
@param cplogin_lst :list: list with usernames for generate mapping
@param with_system_users :bool: add system users to result list or no.
default: False
"""
return dblogin_cplogin_pairs(cplogin_lst, with_system_users)
def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
search_sys_users=True):
"""
Retrieves info about panel user(s)
:param str|unicode|list|tuple|None cpuser: user login
:param keyls: list of data which is necessary to obtain the user,
the valuescan be:
cplogin - name/login user control panel
mail - Email users
reseller - name reseller/owner users
locale - localization of the user account
package - User name of the package
dns - domain of the user
:param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk)
:return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
:rtype: tuple
"""
return cpinfo(cpuser, keyls, search_sys_users=search_sys_users)
def get_admin_email(self):
"""
Retrieve admin email address
:return: Host admin's email
"""
return get_admin_email()
@staticmethod
def useraliases(cpuser, domain):
"""
Return aliases from user domain
:param str|unicode cpuser: user login
:param str|unicode domain:
:return list of aliases
"""
return useraliases(cpuser, domain)
def docroot(self, domain):
"""
Return document root for domain
:param str|unicode domain:
:return Cortege: (document_root, owner)
"""
return docroot(domain)
def userdomains(self, cpuser):
"""
Return domain and document root pairs for control panel user
first domain is main domain
:param str|unicode cpuser: user login
:return list of tuples (domain_name, documen_root)
"""
return userdomains(cpuser)
def homedirs(self):
"""
Detects and returns list of folders contained the home dirs of users of the cPanel
:return: list of folders, which are parent of home dirs of users of the panel
"""
return homedirs()
def reseller_users(self, resellername=None):
"""
Return reseller users
:param resellername: reseller name; autodetect name if None
:return list[str]: user names list
"""
return reseller_users(resellername)
def reseller_domains(self, resellername=None):
"""
Get dict[user, domain]
Attention!! This function may work unstable. See PTCLLIB-95 for details.
:param reseller_name: reseller's name
:rtype: dict[str, str|None]
:raises DomainException: if cannot obtain domains
"""
return reseller_domains(resellername)
def get_user_login_url(self, domain):
"""
Get login url for current panel;
:type domain: str
:rtype: str
"""
return get_user_login_url(domain)
def domain_owner(self, domain):
"""
Return domain's owner
:param domain: Domain/sub-domain/add-domain name
:rtype: str
:return: user name or None if domain not found
"""
return domain_owner(domain)
def get_system_php_info(self):
try:
default_version = WhmApiRequest(
'php_get_system_default_version').call()['version']
except WhmApiError as e:
raise CPAPIExternalProgramFailed(e) from e
return {
'default_version_id': default_version
}
def get_domains_php_info(self):
"""
Returns info about domains: username, php_version, handler_type
For each domain we detect handler and php_version
_get_php_version_id_to_handler_map() returns data of installed versions,
so if the version of some domain was removed we can`t detect the handler.
In such case we set handler_type to None.
Otherwise we detect handler and set it to handler_type
:rtype dict
"""
php_version_to_handler_map = self._get_php_version_id_to_handler_map()
php_settings_per_vhost = self._get_vhosts_php_versions()
domains_php_info = {}
for domain_info in php_settings_per_vhost:
php_version_id = domain_info['version']
if php_version_id not in list(php_version_to_handler_map.keys()):
logger.error("Unable to find php %s in handlers map %s. ",
php_version_id, php_version_to_handler_map,
extra={
'php_version_id': php_version_id,
'php_version_to_handler_map': php_version_to_handler_map
})
handler_type = None
else:
handler_type = 'fpm' if domain_info['php_fpm'] \
else php_version_to_handler_map[php_version_id]
domains_php_info[domain_info['vhost']] = DomainDescription(
username=domain_info['account'],
php_version_id=php_version_id,
handler_type=handler_type,
display_version=php_version_id
)
return domains_php_info
@staticmethod
def get_installed_php_versions():
"""
Get the list of PHP version installed in panel
:return: list
"""
try:
# ['alt-php56', 'alt-php72', 'ea-php74']
php_versions = WhmApiRequest('php_get_installed_versions').call()['versions']
except (KeyError, WhmApiError) as e:
logger.error('CPAPI: Could not get list of installed PHP versions: %s', e)
# todo: consider changing this to return exceptions
return []
else:
php_description = []
for php_name in php_versions:
if php_name.startswith('alt-'):
php_root_dir = f'/opt/{php_name.replace("-", "/")}/'
php_description.append(PHPDescription(
identifier=php_name,
version=f'{php_name[-2]}.{php_name[-1]}',
dir=os.path.join(php_root_dir),
modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'),
bin=os.path.join(php_root_dir, 'usr/bin/php'),
ini=os.path.join(php_root_dir, 'link/conf/default.ini'),
))
elif php_name.startswith('ea-'):
php_root_dir = f'/opt/cpanel/{php_name}/root/'
php_description.append(PHPDescription(
identifier=php_name,
version=f'{php_name[-2]}.{php_name[-1]}',
modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'),
dir=os.path.join(php_root_dir),
bin=os.path.join(php_root_dir, 'usr/bin/php'),
ini=os.path.join(php_root_dir, 'etc/php.ini'),
))
else:
# unknown php, skip
continue
return php_description
@staticmethod
@GeneralPanelPluginV1.cache_call(panel_parker=['/etc/userdatadomains',
'/etc/cpanel/ea4/php.conf'])
def _get_vhosts_php_versions():
"""
See https://documentation.cpanel.net/display/DD/WHM+API+1+Functions+-+php_get_vhost_versions
:rtype: dict
"""
try:
return WhmApiRequest('php_get_vhost_versions').call()['versions']
except WhmApiError as e:
raise CPAPIExternalProgramFailed(e) from e
@staticmethod
@GeneralPanelPluginV1.cache_call(panel_parker=['/etc/cpanel/ea4/php.conf'])
def _get_php_version_id_to_handler_map():
"""
Returns dict with info about php version and it`s current handler:
{'ea-php56': 'cgi', 'ea-php72': 'suphp', 'alt-php51': 'suphp', 'alt-php52': 'suphp' ...}
Using cpanel whmapi request
Tries to get all handlers or if there is problem with some handler - gets handlers one by one
As a result information could be incomplete if some handlers are not available
See https://documentation.cpanel.net/display/DD/WHM+API+1+Functions+-+php_get_handlers
:rtype: dict
"""
try:
handlers = WhmApiRequest('php_get_handlers').call()['version_handlers']
except WhmApiError as e:
logger.error("Unable to get information about php handlers, "
"falling back to per-handler data gathering. "
"Error happened: %s", e,
extra={
'error_message': e.message,
'error_context': e.context
})
handlers = PanelPlugin._get_handler_info_for_each_version()
return {
php['version']: php['current_handler'] for php in handlers
}
@staticmethod
def _get_handler_info_for_each_version():
"""
Gets handler data from each version one by one,
so that data can still be collected
even when one of the installed versions is broken.
:rtype: list
"""
handlers = []
installed_php_versions = PanelPlugin.get_installed_php_versions()
for version in installed_php_versions:
# {'version_handlers': [{'available_handlers': ['cgi', 'none'], 'version': 'ea-php72',
# 'current_handler': None}]}
try:
version_handler = \
WhmApiRequest('php_get_handlers').with_arguments(
version=version['identifier']
).call()['version_handlers'][0]
handlers.append(version_handler)
except (KeyError, WhmApiError) as e:
logger.error('CPAPI: Could not get data for PHP version: %s', e)
continue
return handlers
def get_admin_locale(self):
cpanel_config = load_fast(CPANEL_CONFIG)
try:
server_locale = cpanel_config['server_locale']
if server_locale:
return server_locale
return PanelPlugin.DEFAULT_LOCALE
except KeyError:
return PanelPlugin.DEFAULT_LOCALE
@staticmethod
def get_apache_connections_number():
"""
Retrieves Apache's connections number
:return: tuple (conn_num, message)
conn_num - current connections number, 0 if error
message - OK/Trace
"""
# curl http://127.0.0.1/whm-server-status?auto | grep "Total Accesses"
try:
url = 'http://127.0.0.1/whm-server-status?auto'
response = requests.get(url, timeout=5)
if response.status_code != 200:
return 0, f"GET {url} response code is {response.status_code}"
s_response = response.content.decode('utf-8')
s_response_list = s_response.split('\n')
out_list = list(grep("Total Accesses", data_from_file=s_response_list))
# out_list example: ['Total Accesses: 200']
s_total_accesses = out_list[0].split(':')[1].strip()
return int(s_total_accesses), 'OK'
except Exception:
return 0, format_exc()
@staticmethod
def get_apache_ports_list():
"""
Retrieves active httpd's ports from httpd's config
:return: list of apache's ports
"""
# cat /etc/apache2/conf/httpd.conf | grep Listen
_httpd_ports_list = []
try:
lines = get_file_lines(PanelPlugin.HTTPD_CONFIG_FILE,
unicode_errors_handle='surrogateescape')
except (OSError, IOError):
return None
lines = [line.strip() for line in lines]
for line in grep('Listen', match_any_position=False, multiple_search=True, data_from_file=lines):
# line examples:
# Listen 0.0.0.0:80
# Listen [::]:80
try:
value = int(line.split(':')[-1])
if value not in _httpd_ports_list:
_httpd_ports_list.append(value)
except (IndexError, ValueError):
pass
if not _httpd_ports_list:
_httpd_ports_list.append(80)
return _httpd_ports_list
@staticmethod
def get_apache_max_request_workers():
"""
Get current maximum request apache workers from httpd's config
:return: tuple (max_req_num, message)
max_req_num - Maximum request apache workers number or 0 if error
message - OK/Trace
"""
# cat /etc/apache2/conf/httpd.conf | grep MaxRequestWorkers
# MaxRequestWorkers 150
try:
lines = get_file_lines(PanelPlugin.HTTPD_CONFIG_FILE,
unicode_errors_handle='surrogateescape')
mrw_list = list(grep('MaxRequestWorkers', match_any_position=False, data_from_file=lines))
if len(mrw_list) != 1:
return 0, 'MaxRequestWorkers directive is absent or multiple in httpd\'s config'
parts = mrw_list[0].split(' ')
if len(parts) == 2:
return int(parts[1]), 'OK'
return 0, f'httpd config line syntax error. Line is \'{mrw_list[0]}\''
except (OSError, IOError, IndexError, ValueError):
return 0, format_exc()
@staticmethod
def get_user_emails_list(username: str, domain: str) -> str:
# "acct" : [
# {
# "has_backup" : 0,
# "email" : "bla@cloudlinux.com, blabla@gmail.com"
# }
# ]
emails = \
WhmApiRequest('listaccts').with_arguments(want='email',
searchmethod='exact', search=username,
searchtype='user').call()['acct'][0]
user_emails = emails['email']
if user_emails == '*unknown*':
return ''
return user_emails
@staticmethod
def panel_login_link(username):
link = WhmApiRequest('create_user_session').with_arguments(user=username,
service='cpaneld').call()['url']
if not link:
return ''
# https://77.79.198.14:2083/cpsess3532861743/login/?session=stackoverflow%3ascBEPeVeSXqZgMLs%.. ->
# https://77.79.198.14:2083/
parsed = urlparse(link)
return f'{parsed.scheme}://{parsed.netloc}/'
@staticmethod
def panel_awp_link(username):
link = PanelPlugin.panel_login_link(username).rstrip("/")
if len(link) == 0:
return ''
return f'{link}/cpsess0000000000/frontend/paper_lantern/lveversion/wpos.live.pl'
def get_server_ip(self):
try:
with open('/var/cpanel/mainip', encoding='utf-8') as f:
return f.read().strip()
except FileNotFoundError as e:
raise NotSupported(
'Unable to detect main ip for this server. '
'Contact CloudLinux support and report the issue.'
) from e
def suspended_users_list(self):
# [{'time': 'Tue Mar 26 12:41:31 2024', 'owner': 'root', 'is_locked': 0,
# 'unixtime': 1711456891, 'reason': 'Unknown', 'user': 'susp2'},
# {'is_locked': 0, 'time': 'Tue Mar 26 12:18:53 2024', 'owner':
# 'root', 'reason': 'Unknown', 'user': 'susp', 'unixtime': 1711455533}]
suspended_info = WhmApiRequest('listsuspended').call()['account']
return [item['user'] for item in suspended_info]
def get_unsupported_cl_features(self):
return tuple()