# -*- coding: utf-8 -*-
"""
CloudLinux API for DirectAdmin control panel
"""
import glob
import os
import re
import subprocess
import sys
import syslog
from traceback import format_exc
from typing import Dict, List, Tuple # NOQA
from urllib.parse import urlparse
import requests
from clcommon.clconfpars import (
WebConfigMissing,
WebConfigParsingError,
apache_conf_parser,
load_fast,
nginx_conf_parser,
read_unicode_file_with_decode_fallback,
)
from clcommon.clconfpars import load as loadconfig
from clcommon.clpwd import ClPwd
from clcommon.cpapi.cpapicustombin import (
_docroot_under_user_via_custom_bin,
get_domains_via_custom_binary,
)
from clcommon.cpapi.cpapiexceptions import (
CpApiTypeError,
NoDBAccessData,
NoDomain,
NoPanelUser,
ParsingError,
ReadFileError,
)
from clcommon.cpapi.GeneralPanel import (
DomainDescription,
GeneralPanelPluginV1,
PHPDescription,
)
from clcommon.cpapi.plugins.universal import (
get_admin_email as universal_get_admin_email,
)
from clcommon.features import Feature
from clcommon.utils import (
ExternalProgramFailed,
find_module_param_in_config,
get_file_lines,
grep,
)
__cpname__ = 'DirectAdmin'
DA_DIR = '/usr/local/directadmin'
DA_CONF = os.path.join(DA_DIR, 'conf/directadmin.conf')
DA_DATA_DIR = os.path.join(DA_DIR, 'data')
DA_DB_CONF = os.path.join(DA_DIR, 'conf/mysql.conf')
DA_USERS_PATH = os.path.join(DA_DATA_DIR, 'users')
DA_OPT_PATH = os.path.join(DA_DIR, 'custombuild', 'options.conf')
USER_CONF = 'user.conf'
DOMAINOWNERS = '/etc/virtual/domainowners'
ADMIN_DIR = os.path.join(DA_DATA_DIR, 'admin')
RESELLERS_LIST = os.path.join(ADMIN_DIR, 'reseller.list')
ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list')
USER_PATTERN = re.compile(rf'.+/(.+)/{re.escape(USER_CONF)}')
# WARN: Probably will be deprecated for our "official" plugins.
# See pluginlib.detect_panel_fast()
def detect():
return os.path.isfile('/usr/local/directadmin/directadmin') or \
os.path.isfile('/usr/local/directadmin/custombuild/build')
def db_access():
access = {}
try:
login_data = loadconfig(DA_DB_CONF)
access['login'] = login_data['user']
access['pass'] = login_data['passwd']
except IOError as err:
raise NoDBAccessData(
'Can not open file with data to database access; ' + str(err)
) from err
except KeyError as err:
raise NoDBAccessData(
f'Can not get database access data from file {DA_DB_CONF}'
) from err
return access
def cpusers():
match_list = [USER_PATTERN.match(path) for path in glob.glob(os.path.join(DA_USERS_PATH, '*', USER_CONF))]
users_list = [match.group(1) for match in match_list if match]
return tuple(users_list)
def resellers():
with open(RESELLERS_LIST, encoding='utf-8') as f:
resellers_list = [line.strip() for line in f]
return tuple(resellers_list)
def admins():
with open(ADMINS_LIST, encoding='utf-8') as f:
admins_list = [line.strip() for line in f]
return set(admins_list)
def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False):
from clcommon.cpapi.plugins.universal import _dblogin_cplogin_pairs # pylint: disable=import-outside-toplevel
access = db_access()
data = _dblogin_cplogin_pairs(cplogin_lst=cplogin_lst, access=access)
if with_system_users:
data += tuple(get_da_user(DA_USERS_PATH).items())
return data
def get_da_user(path, quiet=True):
users = {}
cur_dir = os.getcwd()
os.chdir(path)
dir_list = glob.glob('./*')
for user_dir in dir_list:
if os.path.isdir(user_dir):
file_domains = path + '/' + user_dir + '/domains.list'
try:
with open(file_domains, encoding='utf-8') as f:
if len(f.readline()) > 0:
user_name = user_dir[2:]
users[user_name] = user_name
except IOError:
if not quiet:
sys.stderr.write("No file " + file_domains)
os.chdir(cur_dir)
return users
def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True):
returned = []
if isinstance(cpuser, str):
cpusers_list = [cpuser]
elif isinstance(cpuser, (list, tuple)):
cpusers_list = tuple(cpuser)
elif cpuser is None:
cpusers_list = cpusers()
else:
raise CpApiTypeError(funcname='cpinfo', supportedtypes='str|unicode|list|tuple',
received_type=type(cpuser).__name__)
def _get_reseller(config):
if config.get('usertype') == 'reseller':
return config.get('username')
return config.get('creator')
_user_conf_map = {'cplogin': lambda config: config.get('username'),
'package': lambda config: config.get('package'),
'mail': lambda config: config.get('email'),
'reseller': lambda config: _get_reseller(config),
'dns': lambda config: config.get('domain'),
'locale': lambda config: config.get('language')}
keyls_ = [_user_conf_map[key] for key in keyls]
for username in cpusers_list:
user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF)
if os.path.exists(user_conf_file):
user_config = load_fast(user_conf_file)
returned.append([key(user_config) for key in keyls_])
return returned
def _docroot_under_root(domain):
# type: (str) -> Tuple[str, str]
"""
Old method for getting doc_root for domain under root
Method reads DA config
:return: (doc_root, username) cortege
"""
user_name = None
# Load /etc/virtual/domainowners
_domain_to_user_map = _load_domains_owners()
# Find supposed owner of domain
for main_domain in list(_domain_to_user_map.keys()):
if domain == main_domain or domain.endswith(f'.{main_domain}'):
# Parent domain found
user_name = _domain_to_user_map[main_domain]
break
if user_name is None:
domains_list = []
else:
domains_list = userdomains(user_name)
for d in domains_list:
if domain in d:
return d[1], user_name
def _docroot_under_user_old_mechanism(domain):
# type: (str) -> Tuple[str, str]
"""
Old method for getting doc_root for domain under user
Method parses /home/<username>/domains directory
:return: (doc_root, username) cortege
"""
clpwd = ClPwd()
user_pw = clpwd.get_pw_by_uid(os.getuid())[0]
list_domains_and_doc_roots = _get_domains_list_as_user(user_pw.pw_dir)
for domain_data in list_domains_and_doc_roots:
if domain_data['server_name'] == domain:
return domain_data['document_root'], user_pw.pw_name
def docroot(domain):
# type: (str) -> Tuple[str, str]
"""
Retrieves document root for domain
:param domain: Domain to determine doc_root
:return: Cortege: (doc_root, domain_user)
"""
res = None
domain = domain.strip()
uid = os.getuid()
euid = os.geteuid()
if euid == 0 and uid == 0:
res = _docroot_under_root(domain)
else:
res = _docroot_under_user_via_custom_bin(domain)
# If there was successful result, res object will have
# (doc_root, domain_user) format. If there wasn't found any correct
# doc_roots, res will be None.
if res is not None:
return res
raise NoDomain(f"Can't obtain document root for domain '{domain}'")
def _is_nginx_installed():
"""
Check if nginx is installed via custombuild;
"""
config = loadconfig(DA_CONF)
return bool(int(config.get('nginx', 0)) or int(config.get('nginx_proxy', 0)))
def _get_domains_list_as_root(user_path):
"""
Get domains list for user from httpd or nginx config as root
:param user_path: path to DA directory of user's profile
:return: parsed httpd or nginx config
:rtype: list
"""
try:
if _is_nginx_installed():
httpd_conf = nginx_conf_parser(os.path.join(user_path, 'nginx.conf'))
else:
httpd_conf = apache_conf_parser(os.path.join(user_path, 'httpd.conf'))
except WebConfigParsingError as e:
raise ParsingError(e.message) from e
except WebConfigMissing:
return []
return httpd_conf
def _get_domains_list_as_user(user_home):
# type: (str) -> List[Dict[str, str, bool]]
"""
Get domains list for user from ~/domains directory as user.
Method DOESN'T search subdomains, because it's almost impossible detect by user's
folders without privileges escalation
:param user_home: path to user home
:return: list of dictionaries {'server_name': 'domain', 'document_root': 'doc_root', 'ssl': False}
"""
domains_dir = 'domains'
doc_root_dir = 'public_html'
domains_list = []
domains_path = os.path.join(user_home, domains_dir)
# Searching main domains
# All directories of main domains are saved in ~/domains directory
for domain_dir in os.listdir(domains_path):
domain_path = os.path.join(domains_path, domain_dir)
doc_root_path = os.path.join(domains_path, domain_dir, doc_root_dir)
if os.path.isdir(domain_path) and os.path.isdir(doc_root_path):
domains_list.append({
'server_name': domain_dir,
'document_root': doc_root_path,
'ssl': False,
})
else:
continue
return domains_list
def userdomains(cpuser, as_root=False):
# type: (str, bool) -> List[Tuple[str, str]]
"""
Get user's domains list
:return list: domain names
Example:
[('cltest1.com', '/home/cltest1/domains/cltest1.com/public_html'),
('mk.cltest1.com', '/home/cltest1/domains/cltest1.com/public_html/mk'),
('cltest11.com', '/home/cltest1/domains/cltest11.com/public_html')
]
"""
domains_list = []
user_path = os.path.join(DA_USERS_PATH, cpuser)
euid = os.geteuid()
# old method to get list of user's domains
main_domain_path = ''
if not os.path.exists(user_path):
return []
user_home = os.path.expanduser('~' + cpuser)
public_path = os.path.join(user_home, 'public_html')
if os.path.exists(public_path) and os.path.islink(public_path):
main_domain_path = os.path.realpath(public_path)
if euid == 0 or as_root:
httpd_conf = _get_domains_list_as_root(user_path)
for domain in httpd_conf:
if domain['ssl'] is True:
continue
# Put main domain in start of list
if domain['server_name'] in main_domain_path:
domains_list.insert(0, (domain['server_name'], domain['document_root']))
else:
domains_list.append((domain['server_name'], domain['document_root']))
return domains_list
# this case works the same as above but through the rights escalation binary wrapper
# call path: here -> binary -> python(diradmin euid) -> userdomains(as_root=True) -> print json result to stdout
rc, res = get_domains_via_custom_binary()
if rc == 0:
return res
elif rc == 11:
raise NoPanelUser(f'User {cpuser} not found in the database')
else:
raise ExternalProgramFailed(f'Failed to get userdomains: {res}')
def homedirs():
"""
Detects and returns list of folders contained the home dirs of users of the DirectAdmin
:return: list of folders, which are parent of home dirs of users of the panel
"""
home_dirs = set()
clpwd = ClPwd()
users_dict = clpwd.get_user_dict()
for user_name, pw_user in list(users_dict.items()):
conf_file = os.path.join(DA_USERS_PATH, user_name, USER_CONF)
if os.path.exists(conf_file):
home_dir = os.path.dirname(pw_user.pw_dir)
home_dirs.add(home_dir)
return list(home_dirs)
def domain_owner(domain):
"""
Return domain's owner
:param domain: Domain/sub-domain/add-domain name
:return: user name or None if domain not found
"""
return _load_domains_owners().get(domain, None)
@GeneralPanelPluginV1.cache_call(panel_parker=[DOMAINOWNERS])
def _load_domains_owners() -> Dict[str, str]:
"""
Get domain<->user map from /etc/virtual/domainowners file
"""
# 1. Load DA data file
try:
domains_lines = read_unicode_file_with_decode_fallback(DOMAINOWNERS).splitlines()
except (OSError, IOError) as e:
raise ReadFileError(str(e)) from e
# 2. File loaded successfully, parse data and fill dictionaries
_domain_to_user_map = {}
for line_ in domains_lines:
line_ = line_.strip()
# pass empty line
if not line_:
continue
domain_, user_ = line_.split(':')
domain_ = domain_.strip()
user_ = user_.strip()
# Fill domain to user map
_domain_to_user_map[domain_] = user_
return _domain_to_user_map
def reseller_users(resellername):
"""
Return list of reseller users
:param resellername: reseller name; return empty list if None
:return list[str]: user names list
"""
if resellername is None:
return []
all_users_dict = ClPwd().get_user_dict()
users_list_file = os.path.join(DA_USERS_PATH, resellername, 'users.list')
try:
with open(users_list_file, encoding='utf-8') as users_list:
users_list = [item.strip() for item in users_list]
users_list.append(resellername)
# performing intersection to avoid returning non-existing users
# that are still present in config file for some reason
return list(set(all_users_dict) & set(users_list))
except (IOError, OSError):
return []
def reseller_domains(resellername=None):
"""
Get pairs user <=> domain for given reseller;
Empty list if cannot get or no users found;
:type resellername: str
:return list[tuple[str, str]]: tuple[username, main_domain]
"""
if resellername is None:
return []
users = reseller_users(resellername)
return dict(cpinfo(users, keyls=('cplogin', 'dns')))
def get_admin_email():
admin_user_file = os.path.join(DA_USERS_PATH, 'admin', USER_CONF)
cnf = loadconfig(admin_user_file)
return cnf.get('email', universal_get_admin_email())
def is_reseller(username):
"""
Check if given user is reseller;
:type username: str
:rtype: bool
:raise: ParsingError, ReadFileError
"""
user_config = os.path.join(DA_USERS_PATH, username, USER_CONF)
if os.path.exists(user_config):
try:
return loadconfig(user_config)['usertype'] == 'reseller'
except IndexError as e:
raise ParsingError('User config exists, but no usertype given') from e
return False
def get_user_login_url(domain):
return f'http://{domain}:2222'
def _get_da_php_config():
"""
Return map (PHP_DA_CODE:{PHP_HANDLER, PHP_VERSION})
:return:
"""
_php_da_map = {}
try:
php_cfg = loadconfig(DA_OPT_PATH)
except (IOError, OSError):
return None
# iterate through custombuild options.conf php_mode and php_release options
i = 1
while f'php{i}_mode' in php_cfg and f'php{i}_release' in php_cfg:
_php_da_map[str(i)] = {}
_php_da_map[str(i)]['handler_type'] = php_cfg[f'php{i}_mode']
_php_da_map[str(i)]['php_version_id'] = php_cfg[f'php{i}_release']
i += 1
return _php_da_map
def _get_php_code_info_for_domain(domain, owner):
"""
Return php code from domain config
:param domain:
:param owner:
:return: string '1' or '2' - php code in DA
"""
domain_config_file = os.path.join(DA_USERS_PATH, str(owner), 'domains', str(domain) + '.conf')
try:
domain_config = loadconfig(domain_config_file)
except (IOError, OSError):
return '1'
domain_php = domain_config.get('php1_select')
# None - DA custombuild has only one php version
# '0' - it means that user selected default version PHP of DA custombuild
if domain_php is None or domain_php == '0':
domain_php = '1'
return domain_php
def _get_subdomains(all_domains, mapped_all_domains):
subdomains = []
for domain in all_domains:
if domain[0] in mapped_all_domains.keys():
continue
subdomains.append(domain[0])
return subdomains
def get_domains_php_info():
"""
Return php version information for each domain
:return: domain to php info mapping
Example output:
{'cltest.com': {'handler_type': 'mod_php',
'php_version_id': '7.1',
'username': 'cltest'},
'cltest2.com': {'handler_type': 'fastcgi',
'php_version_id': '7.3',
'username': 'kek_2'},
'cltest3.com': {'handler_type': 'suphp',
'php_version_id': '5.5',
'username': 'cltest3'},
'omg.kek': {'handler_type': 'php-fpm',
'php_version_id': '5.2',
'username': 'cltest'}}
:rtype: dict[str, dict]
"""
# returns only main domains
map_domain_user = _load_domains_owners()
result_map = {}
php_da_map = _get_da_php_config()
if php_da_map is None:
return result_map
owner_to_domains: dict[str, list[str]] = {}
for domain, owner in map_domain_user.items():
owner_to_domains.setdefault(owner, []).append(domain)
for owner, domains in owner_to_domains.items():
all_domains_in_httpd_file = userdomains(owner)
# get safely to not break something to other teams
try:
subdomains = _get_subdomains(all_domains_in_httpd_file, map_domain_user)
except Exception:
subdomains = []
for domain in domains:
php_info_code = _get_php_code_info_for_domain(domain, owner)
if php_info_code not in php_da_map \
or php_da_map[php_info_code]['php_version_id'] == 'no':
# 'no' means that php_release specified in user's config
# does not exist in custombuild options.conf
php_info_code = '1'
php_info = php_da_map[php_info_code]
try:
domain_aliases = _useraliases(owner, domain)
except Exception:
domain_aliases = []
# https://forum.directadmin.com/threads/sub-domain-different-php-version.58426/
# subdomain version should be the same as main domain
for domain_entity in [domain] + subdomains + domain_aliases:
result_map[domain_entity] = DomainDescription(
username=owner,
php_version_id=php_info['php_version_id'],
handler_type=php_info['handler_type'],
display_version=f'php{php_info["php_version_id"].replace(".", "")}'
)
return result_map
def _get_installed_alt_php_versions():
"""
Gets installed alt-phpXY - could be chosen via CloudLinux PHP Selector w/o being compiled via custombuild
"""
installed_list = []
alt_phps_directory = '/opt/alt/'
pattern = re.compile(r'^php\d+$')
for item in os.listdir(alt_phps_directory):
item_path = os.path.join(alt_phps_directory, item)
# Check if the item is a directory and its name matches the pattern
if os.path.isdir(item_path) and pattern.match(item) and os.path.exists(f'{item_path}/usr/bin/php'):
version = item.replace('php', '')
installed_list.append(PHPDescription(
identifier=f'alt-{item}',
version=f'{version[:1]}.{version[1:]}',
dir=f'{item_path}/',
modules_dir=os.path.join(item_path, 'usr/lib64/php/modules/'),
bin=os.path.join(item_path, 'usr/bin/php'),
ini=os.path.join(item_path, 'link/conf/default.ini'),
))
return installed_list
def _get_da_php_extension_dir(directadmin_php_dir):
return subprocess.run(
[f'{directadmin_php_dir}bin/php-config', '--extension-dir'],
text=True,
capture_output=True,
check=False,
).stdout
def _get_compiled_custombuild_versions():
"""
Gets compiled phpXY - could be chosen via DirectAdmin PHP Selector
"""
php_da_map = _get_da_php_config()
if php_da_map is None:
return []
# {'1': {'handler_type': 'php-fpm', 'php_version_id': '7.4'},
# '2': {'handler_type': 'php-fpm', 'php_version_id': '8.0'},
# '3': {'handler_type': 'php-fpm', 'php_version_id': 'no'},
# '4': {'handler_type': 'php-fpm', 'php_version_id': 'no'}}
installed_php_data = php_da_map.values()
installed_list = []
# obtain php version compiled via custombuild: phpXY
for version_info in installed_php_data:
version = version_info['php_version_id']
if version == 'no':
continue
directadmin_php_dir = f'/usr/local/php{version.replace(".", "")}/'
if not os.path.exists(directadmin_php_dir):
continue
modules_dir_path = _get_da_php_extension_dir(directadmin_php_dir)
if modules_dir_path:
modules_dir_path = modules_dir_path.strip()
installed_list.append(PHPDescription(
identifier=f'php{version.replace(".", "")}',
version=version,
dir=os.path.join(directadmin_php_dir),
modules_dir=modules_dir_path,
bin=os.path.join(directadmin_php_dir, 'bin/php'),
ini=os.path.join(directadmin_php_dir, 'lib/php.ini'),
))
return installed_list
def _get_aliases(path):
"""
Parse user aliases file and return data
"""
if not os.path.exists(path):
return []
data = []
try:
with open(path, encoding='utf-8') as f:
data = f.readlines()
except IOError as e:
syslog.syslog(syslog.LOG_WARNING, f'Can`t open file "{path}" due to : "{e}"')
return [record.strip().split('=')[0] for record in data]
def _useraliases(cpuser, domain):
"""
Return aliases from user domain
:param str|unicode cpuser: user login
:param str|unicode domain:
:return list of aliases
"""
path = f'/usr/local/directadmin/data/users/{cpuser}/domains/{domain}.pointers'
data = _get_aliases(path)
return data
class PanelPlugin(GeneralPanelPluginV1):
HTTPD_CONFIG_FILE = '/etc/httpd/conf/httpd.conf'
HTTPD_MPM_CONFIG = '/etc/httpd/conf/extra/httpd-mpm.conf'
HTTPD_INFO_CONFIG = '/etc/httpd/conf/extra/httpd-info.conf'
def __init__(self):
super().__init__()
self.ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list')
def getCPName(self):
"""
Return panel name
:return:
"""
return __cpname__
def get_cp_description(self):
"""
Retrieve panel name and it's version
:return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
or None if can't get info
"""
try:
with subprocess.Popen(
['/usr/local/directadmin/directadmin', 'v'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) as p:
out, _ = p.communicate()
# output may differ (depending on version):
# 'Version: DirectAdmin v.1.642'
# 'DirectAdmin v.1.643 55acaa256ec6ed99b9aaec1050de793b298f62b0'
# 'DirectAdmin 1.644 55acaa256ec6ed99b9aaec1050de793b298f62b0'
version_words = (word.lstrip('v.') for word in out.split())
def _is_float(s):
return s.replace('.', '').isdigit()
version = next(filter(_is_float, version_words), '')
return {'name': __cpname__, 'version': version, 'additional_info': None}
except Exception:
return None
def db_access(self):
"""
Getting root access to mysql database.
For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}
:return: root access to mysql database
:rtype: dict
:raises: NoDBAccessData
"""
return db_access()
def cpusers(self):
"""
Generates a list of cpusers registered in the control panel
:return: list of cpusers registered in the control panel
:rtype: tuple
"""
return cpusers()
def resellers(self):
"""
Generates a list of resellers in the control panel
:return: list of cpusers registered in the control panel
:rtype: tuple
"""
return resellers()
def is_reseller(self, username):
"""
Check if given user is reseller;
:type username: str
:rtype: bool
"""
return is_reseller(username)
# unlike admins(), this method works fine in post_create_user
# hook; looks like directadmin updates admins.list a little bit later
# then calls post_create_user.sh
def is_admin(self, username):
"""
Return True if username is in admin names
:param str username: user to check
:return: bool
"""
user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF)
if not os.path.exists(user_conf_file):
return False
user_config = load_fast(user_conf_file)
return user_config['usertype'] == 'admin'
def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
"""
Get mapping between system and DB users
@param cplogin_lst :list: list with usernames for generate mapping
@param with_system_users :bool: add system users to result list or no.
default: False
"""
return dblogin_cplogin_pairs(cplogin_lst, with_system_users)
def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
search_sys_users=True):
"""
Retrieves info about panel user(s)
:param str|unicode|list|tuple|None cpuser: user login
:param keyls: list of data which is necessary to obtain the user,
the valuescan be:
cplogin - name/login user control panel
mail - Email users
reseller - name reseller/owner users
locale - localization of the user account
package - User name of the package
dns - domain of the user
:param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk)
:return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
:rtype: tuple
"""
return cpinfo(cpuser, keyls, search_sys_users=search_sys_users)
def get_admin_email(self):
"""
Retrieve admin email address
:return: Host admin's email
"""
return get_admin_email()
def docroot(self, domain):
"""
Return document root for domain
:param str|unicode domain:
:return Cortege: (document_root, owner)
"""
return docroot(domain)
@staticmethod
def useraliases(cpuser, domain):
return _useraliases(cpuser, domain)
def userdomains(self, cpuser):
"""
Return domain and document root pairs for control panel user
first domain is main domain
:param str|unicode cpuser: user login
:return list of tuples (domain_name, documen_root)
"""
return userdomains(cpuser)
def homedirs(self):
"""
Detects and returns list of folders contained the home dirs of users of the cPanel
:return: list of folders, which are parent of home dirs of users of the panel
"""
return homedirs()
def reseller_users(self, resellername=None):
"""
Return reseller users
:param resellername: reseller name; autodetect name if None
:return list[str]: user names list
"""
return reseller_users(resellername)
def reseller_domains(self, resellername=None):
"""
Get dict[user, domain]
:param reseller_name: reseller's name
:rtype: dict[str, str|None]
:raises DomainException: if cannot obtain domains
"""
return reseller_domains(resellername)
def get_user_login_url(self, domain):
"""
Get login url for current panel;
:type domain: str
:rtype: str
"""
return get_user_login_url(domain)
def admins(self):
"""
List all admins names in given control panel
:return: list of strings
"""
return admins()
def domain_owner(self, domain):
"""
Return domain's owner
:param domain: Domain/sub-domain/add-domain name
:rtype: str
:return: user name or None if domain not found
"""
return domain_owner(domain)
def get_domains_php_info(self):
"""
Return php version information for each domain
:return: domain to php info mapping
:rtype: dict[str, dict]
"""
return get_domains_php_info()
@staticmethod
def _get_da_skin_name():
"""
Retrieve current DA skin name
:return: Current DA skin name. None if unknown
"""
config = loadconfig(DA_CONF)
# starting from DA 1.664 `docsroot` option was replaced by `system_skin`
if 'system_skin' in config:
return config['system_skin']
# grep '^docsroot=' /usr/local/directadmin/conf/directadmin.conf | cut -d/ -f4
docsroot = config.get('docsroot', None)
# docsroot like './data/skins/evolution'
if docsroot is None:
return None
return docsroot.split('/')[-1]
@staticmethod
def get_encoding_name():
"""
Retrieve encoding name, used for package/reseller names
:return:
"""
enhanced_skin_config = os.path.join(DA_DIR, "data/skins/enhanced/lang/en/lf_standard.html")
default_encoding = 'utf8'
current_skin = PanelPlugin._get_da_skin_name()
if current_skin == 'enhanced':
# For enchanced skin we read encoding from its config
# :LANG_ENCODING=iso-8859-1 see LU-99 for more info
skin_config = loadconfig(enhanced_skin_config)
# Option in file is 'LANG_ENCODING', but key is lowercase
return skin_config.get('lang_encoding', default_encoding)
return default_encoding
def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
return (
Feature.RUBY_SELECTOR,
)
@staticmethod
def get_apache_ports_list() -> List[int]:
"""
Retrieves active httpd's ports from httpd's config
:return: list of apache's ports
"""
# cat /etc/apache2/conf/httpd.conf | grep Listen
_httpd_ports_list = []
try:
lines = get_file_lines(PanelPlugin.HTTPD_CONFIG_FILE)
except (OSError, IOError):
return None
lines = [line.strip() for line in lines]
for line in grep('Listen', match_any_position=False, multiple_search=True, data_from_file=lines):
# line examples:
# Listen 0.0.0.0:80
# Listen [::]:80
try:
value = int(line.split(' ')[1])
if value not in _httpd_ports_list:
_httpd_ports_list.append(value)
except (IndexError, ValueError):
pass
if not _httpd_ports_list:
_httpd_ports_list.append(80)
return _httpd_ports_list
@staticmethod
def _get_active_web_server_params() -> Tuple[str, str]:
"""
Determines active web server from options.conf, directive 'webserver'
:return: tuple (active_web_server_name, apache_active_module_name)
active_web_server_name: 'apache', 'nginx', 'nginx_apache', 'litespeed', 'openlitespeed', etc
apache_active_module_name: 'prefork', 'event', 'worker'
(None, None) if DA options.conf read/parse error
"""
web_server_name = None
apache_active_module_name = None
try:
# cat /usr/local/directadmin/custombuild/options.conf | grep webserver
# webserver=apache
# webserver can be: apache, nginx, nginx_apache, litespeed, openlitespeed.
options_lines = get_file_lines(DA_OPT_PATH)
grep_result_list = list(grep('^apache_mpm|^webserver', fixed_string=False, match_any_position=False,
multiple_search=True, data_from_file=options_lines))
# grep_result_list example: ['webserver=apache\n', 'apache_mpm=auto\n']
for line in grep_result_list:
line_parts = line.strip().split('=')
if line_parts[0] == 'webserver':
web_server_name = line_parts[1]
if line_parts[0] == 'apache_mpm':
apache_active_module_name = line_parts[1]
# modules are 'prefork', 'event', 'worker'. 'auto' == 'worker'
if apache_active_module_name == 'auto':
apache_active_module_name = 'worker'
except (OSError, IOError, IndexError):
pass
return web_server_name, apache_active_module_name
def _get_max_request_workers_for_module(self, apache_module_name: str) -> Tuple[int, str]:
"""
Determine MaxRequestWorkers directive value for specified apache module.
Reads config file /etc/httpd/conf/extra/httpd-mpm.conf
:param apache_module_name: Current apache's module name: 'prefork', 'event', 'worker'
:return: tuple (max_req_num, message)
max_req_num - Maximum request apache workers number or 0 if error
message - OK/Error message
"""
try:
return find_module_param_in_config(self.HTTPD_MPM_CONFIG,
apache_module_name,
'MaxRequestWorkers')
except (OSError, IOError, IndexError, ValueError):
return 0, format_exc()
def get_apache_max_request_workers(self) -> Tuple[int, str]:
"""
Get current maximum request apache workers from httpd's config
:return: tuple (max_req_num, message)
max_req_num - Maximum request apache workers number or 0 if error
message - OK/Error message
"""
web_server_name, apache_active_module_name = self._get_active_web_server_params()
if web_server_name is None or apache_active_module_name is None:
return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work"
if web_server_name != "apache":
return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \
"Apache collector will not work"
return self._get_max_request_workers_for_module(apache_active_module_name)
def _get_httpd_status_uri(self) -> str:
"""
Determine apache mod_status URI from /etc/httpd/conf/extra/httpd-info.conf config
:return Apache mod_status URI or None if error/not found
"""
location_uri = None
try:
# # grep -B 2 'SetHandler server-status' /etc/httpd/conf/extra/httpd-info.conf
#
# <Location /server-status>
# SetHandler server-status
info_lines = get_file_lines(self.HTTPD_INFO_CONFIG)
location_directive = '<Location'
location_line = None
for line in info_lines:
line = line.strip()
if line.startswith(location_directive):
# Location directive found, save it
location_line = line
continue
if line.startswith('SetHandler server-status') and location_line:
# server-status found, Extract URI from Location directive start tag
location_uri = location_line.replace(location_directive, '').replace('>', '').strip()
break
except (OSError, IOError):
pass
return location_uri
def get_apache_connections_number(self):
"""
Retrieves Apache's connections number (from apache's mod_status)
:return: tuple (conn_num, message)
conn_num - current connections number, 0 if error
message - OK/Trace
"""
web_server_name, _ = self._get_active_web_server_params()
if web_server_name is None:
return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work"
if web_server_name != "apache":
return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \
"Apache collector will not work"
try:
# curl localhost/server-status?auto | grep "Total Accesses"
# Total Accesses: 25
location_uri = self._get_httpd_status_uri()
if location_uri is None:
return 0, "Can't found mod_status URI in configs"
url = f'http://127.0.0.1{location_uri}?auto'
response = requests.get(url, timeout=5)
if response.status_code != 200:
return 0, f"GET {url} response code is {response.status_code}"
s_response = response.content.decode('utf-8')
s_response_list = s_response.split('\n')
out_list = list(grep("Total Accesses", data_from_file=s_response_list))
# out_list example: ['Total Accesses: 200']
s_total_accesses = out_list[0].split(':')[1].strip()
return int(s_total_accesses), 'OK'
except Exception:
return 0, format_exc()
@staticmethod
def get_installed_php_versions():
"""
Returns installed alt-php(s) on the server
compiled phpXY via custombuild and alt-phpXY has different paths
also user could choose version via PHP selector which was not compiled with custombuild
(will be absent in DA configs)
"""
return _get_installed_alt_php_versions() + _get_compiled_custombuild_versions()
def get_server_ip(self):
ip_list_file = '/usr/local/directadmin/data/admin/ip.list'
if not os.path.exists(ip_list_file):
return ''
with open(ip_list_file, encoding='utf-8') as f:
ips = f.readlines()
if not ips:
return ''
return ips[0].strip()
@staticmethod
def get_user_emails_list(username: str, domain: str):
user_conf = f'/usr/local/directadmin/data/users/{username}/user.conf'
if not os.path.exists(user_conf):
return ''
user_conf = load_fast(user_conf)
return user_conf.get('email', '')
@staticmethod
def panel_login_link(username):
generated_login = subprocess.run(['/usr/local/directadmin/directadmin',
'--create-login-url', f'user={username}'],
capture_output=True, text=True, check=False).stdout
# http://server-206-252-237-2.da.direct:2222/api/login/url?key=0SrJm1CNAIh34w4Fk8Kp4ohypUFp_pMm
if len(generated_login) == 0:
return ''
parsed = urlparse(generated_login)
return f'{parsed.scheme}://{parsed.netloc}/'
@staticmethod
def panel_awp_link(username):
link = PanelPlugin.panel_login_link(username).rstrip("/")
if len(link) == 0:
return ''
return f'{link}/evo/user/plugins/awp#/'
def suspended_users_list(self):
all_users = cpusers()
suspended_users = []
for user in all_users:
user_conf_file = os.path.join(DA_USERS_PATH, user, USER_CONF)
if not os.path.exists(user_conf_file):
continue
user_config = load_fast(user_conf_file)
if user_config.get('suspended') == 'yes':
suspended_users.append(user)
return suspended_users