# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
# General class, implementing common methods, using all cpapi plugins by default
import json
import os
import subprocess
import collections
from pwd import getpwuid
from clcommon.features import (
Feature,
)
from clcommon.cpapi.cpapiexceptions import (
NotSupported,
CPAPIExternalProgramFailed
)
from clcommon.lock import acquire_lock
def _not_supported(func):
def _exception(*a, **kw):
raise NotSupported(f'"{func.__name__}" api not supported')
return _exception
GET_CP_PACKAGE_SCRIPT = '/usr/bin/getcontrolpaneluserspackages'
PANEL_USERS_COUNT_FILE = '/var/lve/panel_users_count'
CPAPI_CACHE_STORAGE = '/var/clcpapi'
class GeneralPanelPluginV1:
def __init__(self):
self._custom_script_name = GET_CP_PACKAGE_SCRIPT
def invalidate_cpapi_cache(self):
pass
@staticmethod
def is_cache_valid(cpapi_cache, panel_markers):
for marker in panel_markers:
if not os.path.exists(marker):
return False
if not os.path.exists(cpapi_cache):
return False
for marker in panel_markers:
# if at least 1 marker is older -> cache is invalid
if os.path.getmtime(marker) > os.path.getmtime(cpapi_cache):
return False
return True
@staticmethod
def rewrite_cpapi_cache(actual_data, cache_file):
try:
with acquire_lock(cache_file + '.lock'):
with open(cache_file, 'w', encoding='utf-8', errors='surrogateescape') as f:
json.dump({'data': actual_data}, f, indent=4)
except PermissionError:
# it's ok if we cannot update cache because we run as user/mockbuild
pass
@staticmethod
def cache_call(**decorator_kwargs):
def decorator(func):
def wrapper(*args, **kwargs):
cache_file = os.path.join(CPAPI_CACHE_STORAGE, func.__name__ + ".cache")
cache_valid = GeneralPanelPluginV1.is_cache_valid(cache_file, decorator_kwargs['panel_parker'])
if os.path.exists(cache_file) and cache_valid:
try:
with open(cache_file, "r", encoding='utf-8', errors='surrogateescape') as f:
data = json.load(f)['data']
except Exception:
# fallback to get data via api call and re-write broken json
data = func(*args, **kwargs)
GeneralPanelPluginV1.rewrite_cpapi_cache(data, cache_file)
else:
data = func(*args, **kwargs)
GeneralPanelPluginV1.rewrite_cpapi_cache(data, cache_file)
return data
wrapper.__cached_func__ = func
return wrapper
return decorator
def getCPName(self):
"""
Return panel name
:rtype: str
:return: Name of panel
"""
return "GeneralPanel"
def _run_long_script(self, args):
"""
Just wraps long script calls.
:param args: arguments to pass
:return: stdout, stderr
"""
with subprocess.Popen(
[self._custom_script_name] + args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as p:
out, err = p.communicate()
returncode = p.returncode
return out, err, returncode
def admin_packages(self, raise_exc=False):
"""
Return list of available admin's packages
:param raise_exc: raise exception on exit code != 0
:return: List of packages. For example
['BusinessPackage', 'Package2']
"""
packages_list = []
stdout, stderr, returncode = self._run_long_script(['--list-packages'])
if raise_exc and returncode != 0:
raise CPAPIExternalProgramFailed(
stderr or f'Failed to get information about packages: {stdout}')
for line in stdout.strip().split('\n'):
if line != '':
packages_list.append(line)
return packages_list
def get_reseller_users(self, reseller):
reseller_users = {}
out, _, _ = self._run_long_script(['--list-reseller-users=' + str(reseller)])
for line in out.strip().split('\n'):
if ',' not in line:
continue
line = line.split(',')
reseller_users[int(line[0])] = {
'package': line[1],
'reseller': reseller}
return reseller_users
def get_uids_list_by_package(self, package_name, reseller_name=None):
"""
Retrieves uid list for package
:param package_name: Package name
:param reseller_name: Reseller name. None for admin's package
:rtype: List
:return: List of uids
Example: [1000, 1002, 1006, 1007, 1008]
"""
uid_list = []
try:
args = ['--package=' + str(package_name)]
if reseller_name is not None:
args.append('--reseller='+str(reseller_name))
stdout, _, _ = self._run_long_script(args)
uid_list = stdout.split('\n')
del uid_list[len(uid_list) - 1]
except (OSError, IOError, AttributeError):
pass
return uid_list
def list_all(self, raise_exc=False):
uid_package_map = {}
out, _, returncode = self._run_long_script(['--list-all'])
if raise_exc and returncode != 0:
raise CPAPIExternalProgramFailed(
f"Failed to get list of users and their packages: {out}")
# if script prints error - skip output processing
if 'error:' not in out:
for line in out.split('\n'):
line = line.split(' ', 1)
if len(line) == 2:
uid_package_map[int(line[0])] = line[1]
return uid_package_map
def list_users(self, raise_exc=False):
users = {}
out, err, returncode = self._run_long_script(['--list-users'])
if raise_exc and returncode != 0:
raise CPAPIExternalProgramFailed(
err or f'Failed to get information about users: {out}')
for line in out.strip().split('\n'):
if ',' not in line:
continue
line = line.split(',')
users[int(line[0])] = {'package': line[1], 'reseller': line[2]}
return users
def resellers_packages(self, raise_exc=False):
"""
Return dictionary, contains available resellers packages, grouped by resellers
:return: Dictionary.
Example:
{'res1': ['BusinessPackage', 'UltraPackage', 'Package'],
'res2': ['SimplePackage', 'Package'] }
"""
resellers_packages = collections.defaultdict(list)
out, err, returncode = self._run_long_script(['--list-resellers-packages'])
if raise_exc and returncode != 0:
raise CPAPIExternalProgramFailed(
err or f'Failed to get information about reseller package: {out}')
# packages_users output format:
# {'res1': ['BusinessPackage', 'UltraPackage', 'Package'],
# 'res2': ['SimplePackage', 'Package'] }
lines = out.split('\n')
for line in lines:
line = line.strip()
# Pass empty and invalid lines
if not line:
continue
# 0 - reseller_name, 1 - package_name
line_parts = line.split(' ', 1)
if len(line_parts) != 2:
continue
res_name, pack_name = line_parts[0], line_parts[1]
resellers_packages[res_name].append(pack_name)
return resellers_packages
def reseller_package_by_uid(self, user_id):
# Get package for user
out, _, _ = self._run_long_script(['--userid=' + str(user_id)])
package = out.split('\n').pop(0)
out, _, _ = self._run_long_script(['--get-user-reseller=' + str(user_id)])
reseller = out.split('\n').pop(0)
return reseller, package
def admins(self):
"""
List all admins names in given control panel
:rtype: List
:return: list of strings
"""
return ['root']
def is_admin(self, username):
"""
Return True if username is in admin names
:param str username: user to check
:return: bool
"""
return username in self.admins()
@_not_supported
def get_cp_description(self):
"""
Retrieve panel name and it's version
:return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
or None if can't get info
"""
pass
@_not_supported
def resellers(self):
"""
Generates a list of resellers in the control panel
:return: tuple of cpusers registered in the control panel
:rtype: tuple
:raise: NotSupported
"""
pass
@_not_supported
def is_reseller(self, username):
"""
Check if user is reseller;
:type username: str
:rtype: bool
"""
pass
@_not_supported
def db_access(self):
"""
Getting root access to mysql database.
For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}
:return: root access to mysql database
:rtype: dict
:raises: NoDBAccessData, NotSupported
"""
pass
@_not_supported
def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
"""
Returs a list of pairs, the database user login - user login control panel
For example:
(('nata2_someuse', 'nata2'), ('testsome_dfrtbus', 'testsome'))
:param list|tuple|None cplogin_lst: list of control panel users
:param bool with_system_users: add system users to dbmapping
:return: list of pairs, the database user login - user login control panel
:rtype: tuple
:raises: NotSupported, NoPackage
"""
pass
@_not_supported
def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns'),
search_sys_users=True):
"""
Retrives information aboutv panel users
:param str|unicode|list|tuple|None cpuser: user login
:param keyls: - list of data which is necessary to obtain the user, the values can be:
cplogin - name/login user control panel
mail - Email users
reseller - name reseller/owner users
locale - localization of the user account
package - User name of the package
dns - domain of the user
userid - user's uid
:param search_sys_users:
:return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
:rtype: tuple
"""
pass
@_not_supported
def get_admin_emails_list(self):
"""
Gets admin emails list
:rtype: List
:return: List: ['admin1@mail.com', 'admin2@mail.com' ]
"""
pass
@_not_supported
def docroot(self, domain):
"""
Return document root for domain
:return: Cortege: (document_root, owner)
"""
pass
@_not_supported
def userdomains(self, cpuser):
"""
Return domain and document root pairs for control panel user
first domain is main domain
:param str|unicode cpuser: user login
:rtype: List
:return: list of tuples (domain_name, documen_root)
"""
pass
@_not_supported
def homedirs(self):
"""
Detects and returns list of folders contained the home dirs of users of the cPanel
:rtype: List
:return: list of folders, which are parent of home dirs of users of the panel
"""
pass
@_not_supported
def reseller_users(self, resellername=None):
"""
Return reseller users
:param resellername: reseller name; autodetect name if None
:rtype: List
:return list[str]: user names list
"""
pass
@_not_supported
def reseller_domains(self, resellername=None):
"""
Return reseller users and their main domains
:param resellername: reseller name; autodetect name if None
:rtype: List
:return dict[str, str]: pairs user <==> domain
"""
pass
@_not_supported
def get_user_login_url(self, domain):
"""
Get login url for current panel;
:type domain: str
:rtype: str
:return: Panel login URL
"""
pass
@_not_supported
def get_reseller_id_pairs(self):
"""
Get dict reseller => id
Optional method for panels without hard
link reseller <=> system user
:rtype: dict[str,int] - {'res1': id1}
:return:
"""
pass
# not documented yet
@_not_supported
def get_domains_php_info(self):
"""
Retrives dictionary information about php versions for each domain
{
'domain.com': {
'php_version_id': 'ea-php70'
'php_handler': lsapi | fpm | cgi | fastcgi
}}
:rtype: dict
"""
pass
@_not_supported
def get_installed_php_versions(self):
"""
Retrives list of php versions installed in panel
:rtype: list
"""
pass
# not documented yet
@_not_supported
def get_system_php_info(self):
"""
Retrives dictionary with system information about php
:rtype: dict
"""
pass
@_not_supported
def get_admin_locale(self):
"""
:rtype: str
"""
pass
@staticmethod
@_not_supported
def get_encoding_name():
"""
Retrive encoding name, used for package/reseller names, from panel
:return:
"""
pass
def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
"""
Return list of CloudLinux features that cannot
be used with current control panel.
"""
raise NotImplementedError()
@staticmethod
def get_apache_ports_list():
"""
Retrieves active httpd's ports from httpd's config
:return: list of apache's ports
"""
return [80]
@staticmethod
def get_apache_connections_number():
"""
Retrieves Apache's connections number (from mod_status)
For CM
"""
# Unsupported panel
return 0, "OK"
@staticmethod
def get_apache_max_request_workers():
"""
Get current maximum request apache workers from httpd's config
:return: tuple (max_req_num, message)
max_req_num - Maximum request apache workers number or 0 if error
message - OK/Trace
"""
# Unsupported panel
return 0, "OK"
@staticmethod
def get_main_username_by_uid(uid: int) -> str:
"""
Get "main" panel username by uid.
:param uid: uid
:return Username
"""
try:
# Get username by id
return getpwuid(uid).pw_name
except KeyError:
pass
return 'N/A'
@staticmethod
def get_user_emails_list(username: str, domain: str) -> str:
return ''
@staticmethod
def panel_login_link(username):
return ''
@staticmethod
def panel_awp_link(username):
return ''
@staticmethod
def get_hosting_accounts_count() -> int:
"""
Get users count
:return: number of users
"""
if os.path.isfile(PANEL_USERS_COUNT_FILE):
with open(PANEL_USERS_COUNT_FILE, 'r', encoding='utf-8') as f:
count = f.read()
if count:
return int(count)
return 0
def get_customer_login(self, username):
"""
99% of control panels log-in user with same username
as system user has, except for Plesk
"""
return username
def get_domain_login(self, username, domain):
"""
99% of control panels log-in user with same username
as system user has, in the Plesk panel we need a subscription login
"""
return username
def get_server_ip(self):
"""
Get ip of the server that is configured in control panel to be "main".
"""
raise NotSupported('Unable to detect main ip for this server. '
'Contact CloudLinux support and report the issue.')
def suspended_users_list(self):
return []