import random
import os
import string
from datetime import datetime, timedelta
from pathlib import Path
from defence360agent.subsys.panels.base import InvalidTokenException
from defence360agent.contracts.config import UIRole, UserType
from defence360agent.utils import atomic_rewrite
UIRoleToUserType = {
UIRole.ADMIN: UserType.ROOT,
UIRole.CLIENT: UserType.NON_ROOT,
}
class JWTIssuer:
JWT_SECRET_FILE = Path("/var/imunify360/.api-secret.key")
JWT_SECRET_FILE_PREV = Path("/var/imunify360/.api-secret-prev.key")
TOKEN_EXPIRATION_TTL = timedelta(hours=6)
SECRET_EXPIRATION_TTL = timedelta(days=1)
@classmethod
def is_secret_expired(cls):
try:
stat = os.stat(cls.JWT_SECRET_FILE)
except FileNotFoundError:
st_mtime = 0.0
else:
st_mtime = stat.st_mtime
return (
datetime.now().timestamp() - st_mtime
> cls.SECRET_EXPIRATION_TTL.seconds
)
@classmethod
def _get_secret(cls) -> str:
if cls.is_secret_expired():
new_secret = "".join(
random.choice(string.ascii_uppercase + string.digits)
for _ in range(64)
)
if not cls.JWT_SECRET_FILE.exists():
cls.JWT_SECRET_FILE.touch()
atomic_rewrite(
str(cls.JWT_SECRET_FILE),
new_secret,
backup=str(cls.JWT_SECRET_FILE_PREV),
uid=-1,
permissions=0o600,
)
return new_secret
else:
return cls.JWT_SECRET_FILE.read_text()
@classmethod
def get_token(cls, user_name: str, user_type: UIRole) -> str:
"""
Generates a token with several encoded fields:
user name,
user type,
expiration timestamp
"""
import jwt
return jwt.encode(
{
"user_type": user_type,
"username": user_name,
"exp": (datetime.now() + cls.TOKEN_EXPIRATION_TTL).timestamp(),
},
cls._get_secret(),
)
@classmethod
def _parse_token(cls, token: str, secret: str) -> dict | None:
import jwt
# if handle these exceptions at global level,
# jwt shoud be imported there,
# increasing memory consumation
try:
return jwt.decode(token, secret, algorithms=["HS256"])
except jwt.PyJWTError:
pass
@classmethod
def parse_token(cls, token: str):
for secret_pth in [cls.JWT_SECRET_FILE, cls.JWT_SECRET_FILE_PREV]:
if not secret_pth.exists():
continue
decoded = cls._parse_token(token, secret_pth.read_text())
if decoded:
return {
"user_name": decoded["username"],
"user_type": UIRoleToUserType[decoded["user_type"]],
}
else:
raise InvalidTokenException("INVALID_TOKEN")