"""Gather information from Plesk via DB querries.
"""
import logging
import re
from collections import defaultdict
from typing import Dict, List, Sequence
from defence360agent.utils import CheckRunError, check_run, retry_on
from defence360agent.subsys.panels.base import PanelException
logger = logging.getLogger(__name__)
async def raise_panel_exception(*args, **kwargs):
raise PanelException(*args, **kwargs)
@retry_on(CheckRunError, max_tries=3, on_error=raise_panel_exception)
async def _run_query(query):
return (await check_run(["plesk", "db", "-N", "-e", query])).decode()
async def get_user_to_domain() -> Dict[str, List[str]]:
"""Return mapping: user -> user's domains."""
result = (
await _run_query(
"select login, name from domains "
" left join hosting on dom_id = domains.id "
" right join sys_users on hosting.sys_user_id = sys_users.id"
)
).split()
result_mapping = defaultdict(list)
for user, domain in zip(result[0::2], result[1::2]):
if domain != "NULL":
result_mapping[user].append(domain)
return result_mapping
async def get_domain_to_user() -> Dict[str, List[str]]:
"""Return mapping: domain -> user."""
result = (
await _run_query(
"select name, login "
"from domains "
" left join hosting on dom_id = domains.id "
" left join sys_users on hosting.sys_user_id = sys_users.id"
)
).split()
return {domain: [user] for domain, user in zip(result[0::2], result[1::2])}
async def get_user_details() -> Dict[str, Dict[str, str]]:
"""
Returns dict with user to email and locale pairs
Not used, because MyImunify implemented for cPanel only yet
"""
user_details = {}
results = await _run_query(
"SELECT CONCAT(login, ';',email, ';',locale) FROM clients;"
)
for record in results.split("\n"):
if not record:
continue
user, email, locale = record.split(";")
user_details[user] = {
"email": email,
"locale": locale.replace("-", "_"),
}
return user_details
async def get_domains() -> List[str]:
"""Return: list of domains"""
return (await _run_query("select name from domains")).split()
async def get_users() -> List[str]:
"""Return: users that created by Plesk"""
return (
await _run_query(
"SELECT sys_users.login FROM sys_users JOIN hosting ON"
" hosting.sys_user_id=sys_users.id JOIN domains ON"
" hosting.dom_id=domains.id AND domains.webspace_id=0"
)
).split()
async def count_customers_with_subscriptions() -> int: # pragma: no cover
"""Return: count active customers with at least one (any) domain"""
return int(
await _run_query(
"select count(distinct cl_id) from clients c join domains d on "
"d.cl_id = c.id where c.status = 0"
)
)
async def get_admin_emails() -> list:
emails = await _run_query("SELECT email FROM clients WHERE type='admin';")
return [email for email in emails.split("\n") if email]
async def list_docroots() -> Sequence[str]:
query = "SELECT DISTINCT hosting.www_root FROM hosting;"
return (await _run_query(query)).split()
async def list_docroots_domains_users():
sql = (
"select hosting.www_root, domains.name, sys_users.login"
" from hosting"
" inner join domains on hosting.dom_id = domains.id"
" inner join sys_users on hosting.sys_user_id=sys_users.id"
)
data = await _run_query(sql)
retval = [string.split() for string in data.split("\n") if string]
return retval