# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import os
import re
import signal
import subprocess
import sys
LVE_FILE = '/proc/lve/list'
# GET VERSION from /proc/lve/list
def get_lve_version():
"""
Obtain lve process filesystem version
"""
try:
with open(LVE_FILE, encoding='utf-8') as f:
line = f.read(3) # we need only first three symbols for parse version
lve_procfs_version = [int(line.rsplit(':', 1)[0]), 'OK']
except IOError:
lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t open file {LVE_FILE}']
except IndexError:
lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t get data from {LVE_FILE}']
except ValueError:
lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t parse {LVE_FILE}']
return lve_procfs_version
BYTES_CONVERSION_TABLE = {
'K': 1,
'M': 1024,
'G': 1024 * 1024,
'T': 1024 * 1024 * 1024
}
def validate_cpu(val: str | int) -> str | int | None:
"""
Check that the value is a valid CPU limit (0-100 int or speed (% or MHz/GHz)).
Return the value if valid, otherwise None.
If the value contains decimal part, return only integer part.
"""
data = str(val)
# Validate integer CPU limits
regexp_int = re.compile(r'^([1-9][0-9]?|100)(?:\.(\d+))?$')
regex_match = regexp_int.match(data)
if regex_match:
if isinstance(val, (float, int)):
return int(val)
return regex_match.group(1) # return only integer part
# Validate percentage speeds
regexp_speedp = re.compile(r'^([1-9]|[1-9][0-9]*)(?:\.(\d+))?%$')
regex_match = regexp_speedp.match(data)
if regex_match:
return f'{regex_match.group(1)}%'
# Validate frequency speeds
regexp_speedf = re.compile(r'^([1-9]|[1-9][0-9]*)(?:\.(\d+))?(mhz|ghz)$', re.IGNORECASE)
regex_match = regexp_speedf.match(data)
if regex_match:
integer_part, _, unit = regex_match.groups()
return f'{integer_part}{unit}'
return None
def validate_int(val, min_val=0, max_val=sys.maxsize):
"""
Check that val - is a string number
return val as a string
"""
try:
dig_val = int(val)
except ValueError:
return None
if max_val >= dig_val >= min_val:
return val
def memory_to_page(val, min_val=0, max_val=sys.maxsize):
try:
suffix = val[-1]
if suffix.isdigit():
suffix = 'K'
val = val + suffix
result = int(float(val[:-1]) * BYTES_CONVERSION_TABLE[suffix.upper()] / 4)
if max_val >= result >= min_val:
return result
except (IndexError, ValueError, KeyError):
pass
return None
def page_to_memory(pages):
if pages < 256: # KB
return str(pages * 4) + 'K'
if pages < 262144: # MB
return str(round(float(pages) * 4 / 1024, 2)) + 'M'
return str(round(float(pages) * 4 / (1024 * 1024), 2)) + 'G'
def reload_processes(item, username):
with subprocess.Popen(
['/bin/ps', '-U', username, '-u', username],
stdout=subprocess.PIPE,
) as proc:
lines = proc.communicate()[0].split(b"\n")
for row in lines:
parts = row.rstrip().split()
try:
parts[-1].index(item.encode())
os.kill(int(parts[0]), signal.SIGHUP)
except (IndexError, ValueError, OSError):
continue
def login_defs(key, default=None, _path='/etc/login.defs'):
with open(_path, encoding='utf-8') as login_defs:
for raw_line in login_defs:
if raw_line.startswith('#'):
continue # ignore commented lines
line = raw_line.split('#', 1)[0] # clear line of comments
line_splited = line.split()
if len(line_splited) >= 2 and line_splited[0] == key:
return line_splited[1]
return default
def uid_max(default=60000):
try:
uid_max_ = int(login_defs('UID_MAX'))
except (IOError, ValueError, TypeError):
uid_max_ = default
return uid_max_
def exit_with_error(message, status=1):
sys.stderr.write(f"{message}\n")
sys.exit(status)
def safe_escaped_unicode_to_utf(s):
# transforms unicode-escaped string into utf-8 encoded str
if '\\u' in s: # str should have escaped unicode symbols
try:
s = s.decode('unicode_escape').encode('utf-8')
except UnicodeDecodeError:
pass
return s
# Copy/paste helper to convert unicode results of json.load back to str
# https://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-of-unicode-from-json/13105359#13105359
def byteify(data):
if isinstance(data, dict):
return {byteify(key): byteify(value)
for key, value in data.items()}
elif isinstance(data, list):
return [byteify(element) for element in data]
elif isinstance(data, tuple):
return tuple(byteify(element) for element in data)
elif isinstance(data, str):
return data.encode()
else:
return data
def unicodeify(data):
if isinstance(data, dict):
return {unicodeify(key): unicodeify(value)
for key, value in data.items()}
elif isinstance(data, list):
return [unicodeify(element) for element in data]
elif isinstance(data, tuple):
return tuple(unicodeify(element) for element in data)
elif isinstance(data, bytes):
return data.decode()
else:
return data
def is_ascii_string(s):
"""
Check is string contains only ASCII characters
:param s: string to check
:return: True - string contains only ASCII characters
"""
try:
s.encode(encoding='utf-8').decode('ascii')
except UnicodeDecodeError:
return False
else:
return True
def escape_formatting_chars(text: str) -> str:
"""
Escape '%' characters inside text, except '%' followed by '('
"""
def replace(match_obj):
"""
Generate string to replace from matched string
'% ' -> '%% '
'%%c' -> '%%%%c'
"""
return match_obj.group()[:-1] * 2 + match_obj.group()[-1]
# twice all '%' inside text except '%' followed by '(' and '%' at the end
# '% hello' -> '%% hello'
# '%% hello' -> '%%%% hello'
text = re.sub(r"%+([^(])", replace, text)
# replace '%' at the end
text = re.sub(r"%$", r"%%", text)
return text