# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import os
import pwd
import grp
from contextlib import contextmanager
from .clexception import FormattedException
class ClPwd:
LOGIN_DEF_FILE = '/etc/login.defs'
class NoSuchUserException(FormattedException):
def __init__(self, user, *args, **kwargs):
super(ClPwd.NoSuchUserException, self).__init__({
'message': "No such user (%(user)s)",
'context': {'user': user}
}, *args, **kwargs)
def __init__(self, min_uid=None):
self._user_key_map = {}
self._uid_key_map = {}
self._user_full_map = {}
self._uid_full_map = {}
if min_uid is None:
self._min_uid = self.get_sys_min_uid(500)
else:
self._min_uid = min_uid
def get_user_dict(self):
self._load_passwd_database()
return self._user_key_map
def get_uid_dict(self):
self._load_passwd_database()
return self._uid_key_map
def get_user_full_dict(self):
self._load_passwd_database()
return self._user_full_map
def get_uid_full_dict(self):
self._load_passwd_database()
return self._uid_full_map
def get_pw_by_name(self, user):
"""
Return pw_entry for user
"""
try:
return self.get_user_full_dict()[user]
except KeyError as e:
raise ClPwd.NoSuchUserException(user) from e
def get_pw_by_uid(self, uid):
"""
Return list of passwd entries for uid
"""
try:
return self.get_uid_full_dict()[uid]
except KeyError as e:
raise ClPwd.NoSuchUserException(uid) from e
def get_uid(self, user):
"""
Returns uid for user
"""
try:
return self.get_user_full_dict()[user].pw_uid
except KeyError as e:
raise ClPwd.NoSuchUserException(user) from e
def get_homedir(self, user):
"""
Returns homedir for a user
@param user: string
@return: string
"""
try:
return self.get_user_full_dict()[user].pw_dir
except KeyError as e:
raise ClPwd.NoSuchUserException(user) from e
def _load_passwd_database(self):
"""
Loads the passwd database and fills user_to_uid and user_to_homedir maps
"""
if not self._uid_full_map:
for entry in pwd.getpwall():
self._user_full_map[entry.pw_name] = entry
if entry.pw_uid not in self._uid_full_map:
self._uid_full_map[entry.pw_uid] = []
self._uid_full_map[entry.pw_uid].append(entry)
if entry.pw_uid >= self._min_uid:
self._user_key_map[entry.pw_name] = entry
if entry.pw_uid not in self._uid_key_map:
self._uid_key_map[entry.pw_uid] = []
self._uid_key_map[entry.pw_uid].append(entry)
def get_names(self, uid):
"""
Return names of users with uid specified
@param uid: int
@return: list of strings
"""
try:
entries = self.get_uid_full_dict()[uid]
except KeyError as e:
raise ClPwd.NoSuchUserException(uid) from e
return [entry.pw_name for entry in entries]
def get_sys_min_uid(self, def_min_uid=500):
"""
Return system defined MIN_UID from /etc/login.def or def_min_uid
@param def_min_uid: int
@return: MIN_UID: int
"""
if os.path.exists(self.LOGIN_DEF_FILE):
with open(self.LOGIN_DEF_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if line.startswith('UID_MIN'):
try:
return int(line.split('UID_MIN')[1].strip())
except ValueError:
pass
return def_min_uid
def drop_user_privileges(user=None, effective_or_real=True, set_env=True):
"""
Drop current root privileges to user
:param effective_or_real: if True - drop euid, else - drop ruid
:param user: name of unix user
:param set_env: bool -> if true set $HOME and $USER env variables
:return: None
"""
current_euid = os.geteuid()
if current_euid == 0 and user:
user_pwd = pwd.getpwnam(user)
# set user's groups
user_groups = [group.gr_gid for group in grp.getgrall() if user in group.gr_mem]
main_user_group = user_pwd.pw_gid
if main_user_group not in user_groups:
user_groups.append(main_user_group)
os.setgroups(user_groups)
# set effective uid and gid
if effective_or_real:
os.setegid(user_pwd.pw_gid)
os.seteuid(user_pwd.pw_uid)
else:
os.setgid(user_pwd.pw_gid)
os.setuid(user_pwd.pw_uid)
if set_env:
# set user's env vars
os.environ['USER'] = user
os.environ['HOME'] = user_pwd.pw_dir
@contextmanager
def drop_privileges(user):
"""
Temporary drop privileges to some user
:type user: str
:raises: ClPwd.NoSuchUserException
"""
old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups()
try:
drop_user_privileges(user, effective_or_real=True, set_env=False)
except KeyError as e:
raise ClPwd.NoSuchUserException(user) from e
try:
yield
finally:
os.seteuid(old_uid)
os.setegid(old_gid)
os.setgroups(old_groups)
def _resolve_doc_root_by_user(user):
"""
Resolve document root by username
:param user: str -> name of unix user
:return: str -> document root
"""
if user is None:
raise FormattedException('Cannot resolve docroot without specified user')
# to avoid circular imports
from .cpapi import userdomains # pylint: disable=cyclic-import,import-outside-toplevel
from .cpapi.cpapiexceptions import NoDomain # pylint: disable=cyclic-import,import-outside-toplevel
domains_list = userdomains(user)
# get document root for main domain
try:
_, result = domains_list[0]
except IndexError as e:
raise NoDomain({
'message': 'No domain for user %(user)s found',
'context': {'user': user},
}) from e
return result
def resolve_username_and_doc_root(user=None, domain=None):
"""
Resolve username and doc_root by domain,
or resolve document root by username,
or resolve document root and username by effective uid
:param user: str -> name of unix user
:param domain: str -> domain of panel user
:return: tuple -> user, doc_root
"""
# to avoid circular imports
from .cpapi import docroot # pylint: disable=cyclic-import,import-outside-toplevel
from .cpapi.cpapiexceptions import IncorrectData # pylint: disable=cyclic-import,import-outside-toplevel
result_user = user
result_doc_root = None
current_euid = os.geteuid()
if domain is not None:
doc_root, domain_user = docroot(domain)
if user is None:
result_user = domain_user
elif user != domain_user:
raise IncorrectData("User and domain are in conflict")
result_doc_root = doc_root
elif user is not None: # we can obtain user name for domain
result_doc_root = _resolve_doc_root_by_user(user=user)
elif current_euid != 0: # get doc_root and username by current euid
result_user = pwd.getpwuid(current_euid).pw_name
result_doc_root = _resolve_doc_root_by_user(user=result_user)
return result_user, result_doc_root