# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import json
import os
import subprocess
import logging
import raven
from clcommon.public_hooks import (
POST_MODIFY_DOMAIN,
PRE_MODIFY_USER,
POST_MODIFY_USER
)
from clcommon.utils import run_command, ExternalProgramFailed, is_user_present
from secureio import makedirs_secure, write_file_secure, read_file_secure
EXIT_NO_USER_FOUND = 1
ERR_NO_USER_FOUND = 'Unable to find username in hook cmdline'
WHMAPI1 = '/usr/sbin/whmapi1'
LVE_DIR = '/var/lve'
TMP_DIR = os.path.join(LVE_DIR, 'tmp')
logger = logging.getLogger(__name__)
def print_response(hook_name, success=True):
"""
cPanel expects that each custom hook
prints two values in the end of the execution:
- status, where 1 means success
- message, which explains non-1 statuses
otherwise nothing really breaks, but logs
are full of "script returned invalid response" msgs
:param hook_name: name, path or anything else to fill
message with in order to understand
what exactly failed
:param success: is it everything ended successfully?
:return: Nothing
"""
if not success:
print(0, f"Failed to execute hook {hook_name}; you can find logs in "
"/var/log/cloudlinux/hooks/info.log "
"and contact CloudLinux Support if you need "
"help with the issue.")
else:
print(1, "Ok")
# ATTENTION: we use this function in
# processpaneluserspackages and lvemanager
def call_sync_map():
"""
Run lvectl sync-map and log possible stdout|err in case of errors.
:return: None
"""
with subprocess.Popen(
['lvectl', 'sync-map'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
stdout, stderr = proc.communicate()
if proc.returncode != 0:
logger.error('Error during "lvectl sync-map", code: %s, '
'stderr: `%s`, stdout: `%s`. Reseller limits '
'kernel mapping might be not synchronized.'
'Contact CloudLinux Support for help.'
'', proc.returncode, stdout, stderr)
def cpanel_postwwwacct_main(data):
"""
Post create account hook of cPanel
:return: None
"""
user = data.get('user', None)
owner = data.get('owner', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'create',
'--username', user, '--owner', owner],
env={'CPANEL_RESTORE': str(data.get('is_restore', 0))})
def cpanel_prekillacct_main(data):
"""
Pre kill account hook of cPanel
:return: None
"""
# It's necessary destroy lve before remove user home directory,
# otherwise will be error due busy mount points of cagefs
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
if not is_user_present(user):
logger.warning('User %s does not present in the system, skip hook', user)
return 0
return subprocess.call([
PRE_MODIFY_USER, 'delete',
'--username', user])
def cpanel_postkillacct_main(data):
"""
Post kill account hook of cPanel
:return: None
"""
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'delete',
'--username', user])
def _read_old_domain(user):
"""
Read old domain for modified account
:param user: name of user
:return: old domain
:rtype: str
"""
domain = None
filename = os.path.join(TMP_DIR, user)
try:
content = read_file_secure(filename, uid=0, gid=0, exit_on_error=False,
# This log is intended to be used only by
# cagefs update command
write_log=False)
domain = content[0]
except (IndexError, OSError, IOError):
# use Raven carefully and only in places where
# you sure that sentry is already initialized
raven.base.Raven.captureException(
message='failed to read old domain for user (pre hook no called?)')
return domain
def cpanel_postmodifyacct_main(data):
"""
Post modify account hook of cPanel
:return: None
"""
user = data.get('user', None)
new_user = data.get('newuser', None)
domain = data.get('domain', None)
exit_code = 0
# changing owner of user
# FIXME: this check does not work because cpanel sends `owner` always
new_owner = data.get('owner', data.get('OWNER'))
args = [POST_MODIFY_USER, 'modify', '-u', user]
if new_owner:
args += ['--new-owner', new_owner]
if all((user, new_user,)) and user != new_user:
args += ['--new-username', new_user]
exit_code += subprocess.call(args)
old_domain = _read_old_domain(user)
if domain is not None and domain != old_domain:
# looks like domain is renamed
exit_code += subprocess.call([
POST_MODIFY_DOMAIN,
'modify', '--username', user if new_user is None else new_user,
'--domain', old_domain, '--new-domain', domain,
'--include-subdomains'])
return exit_code
def cpanel_postrestoreacct_main(data):
"""
Post restore account hook of cPanel
:return: None
"""
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'restore',
'--username', user])
def _get_old_domain(user):
"""
Get old domain for modified account
:param user: name of user
:return: old domain
:rtype: str
"""
domain = None
try:
cmd = [
WHMAPI1,
'listaccts',
f'search={user}',
'searchtype=user',
'searchmethod=exact',
'want=domain',
'--output=json'
]
std_out = run_command(cmd, return_full_output=True)[1] # take only std_out, ignore std_err
data = json.loads(std_out)
domain = data['data']['acct'][0]['domain']
except (ExternalProgramFailed, IndexError, KeyError):
# use Raven carefully and only in places where
# you sure that sentry is already initialized
raven.base.Raven.captureException(message='failed to get old domain for user from cpanel')
return domain
def cpanel_premodifyacct_main(data):
"""
Pre modify account hook of cPanel
:return: None
"""
user = data.get('user')
# getting old domain
# TODO: why not cpapi?
domain = _get_old_domain(user)
if domain is None:
return 0
# save old domain to file
filename = os.path.join(TMP_DIR, user)
makedirs_secure(TMP_DIR, perm=0o750, uid=0, gid=0, parent_path=LVE_DIR)
write_file_secure([domain], filename, uid=0, gid=0, perm=0o700)
return 0