"""Helper for integrate sentry in stand-alone scripts""" import json import os import subprocess from contextlib import suppress from pathlib import Path from typing import List import distro import sentry_sdk IMUNIFY360 = "imunify360" IMUNIFYAV = "imunify-antivirus" IMUNIFY360_PKG = "imunify360-firewall" LICENSE = "/var/imunify360/license.json" LICENSE_FREE = "/var/imunify360/license-free.json" FREE_ID = "IMUNIFYAV" UNKNOWN_ID = "UNKNOWN" SENTRY_DSN_PATH = Path("/opt/imunify360/venv/share/imunify360/sentry") SENTRY_DSN_DEFAULT = "https://6de77a2763bd40c58fc9e3a89285aaa8@im360.sentry.cloudlinux.com/3?timeout=20" # noqa: E501 def get_sentry_dsn() -> str: """Return dsn from the file or the default one.""" try: return SENTRY_DSN_PATH.read_text(encoding="ascii").strip() except (OSError, UnicodeDecodeError): return SENTRY_DSN_DEFAULT def get_server_id() -> str: with suppress(Exception): for filename in [LICENSE, LICENSE_FREE]: with suppress(FileNotFoundError), open(filename) as file: return json.load(file)["id"] return UNKNOWN_ID def collect_output(cmd: List[str]) -> str: try: cp = subprocess.run( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) except OSError: return "" if cp.returncode != 0: return "" return os.fsdecode(cp.stdout) def get_rpm_version(pkg: str) -> str: cmd = ["rpm", "-q", "--queryformat=%{VERSION}-%{RELEASE}", pkg] return collect_output(cmd) def get_dpkg_version(pkg: str) -> str: cmd = ["dpkg-query", "--showformat=${Version}", "--show", pkg] return collect_output(cmd) def get_current_os(): platform_os = distro.linux_distribution()[0] return platform_os.lower() def get_package_name(): platform_os = get_current_os() service_name = IMUNIFY360_PKG if platform_os != "ubuntu" and get_rpm_version(IMUNIFYAV): service_name = IMUNIFYAV else: service_name = IMUNIFY360 return service_name def get_service_version(service_name) -> str: platform_os = get_current_os() if platform_os != "ubuntu": version = get_rpm_version(service_name) else: version = get_dpkg_version(service_name) return version def configure_sentry(): # using LoggingIntegration (contained in default Integrations) # logging event with *error* level will be reported to Sentry automatically sentry_sdk.init(dsn=get_sentry_dsn()) with sentry_sdk.configure_scope() as scope: package = get_package_name() scope.user = {"id": get_server_id()} scope.set_tag("name", package) scope.set_tag("version", get_service_version(package)) def flush_sentry(): client = sentry_sdk.Hub.current.client if client is not None: client.flush(timeout=2.0)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
api | Folder | 0755 |
|
|
application | Folder | 0755 |
|
|
contracts | Folder | 0755 |
|
|
feature_management | Folder | 0755 |
|
|
files | Folder | 0755 |
|
|
hooks | Folder | 0755 |
|
|
internals | Folder | 0755 |
|
|
migrations | Folder | 0755 |
|
|
model | Folder | 0755 |
|
|
mr_proper | Folder | 0755 |
|
|
myimunify | Folder | 0755 |
|
|
plugins | Folder | 0755 |
|
|
rpc_tools | Folder | 0755 |
|
|
simple_rpc | Folder | 0755 |
|
|
subsys | Folder | 0755 |
|
|
utils | Folder | 0755 |
|
|
__init__.py | File | 0 B | 0644 |
|
__main__.py | File | 43 B | 0644 |
|
_version.py | File | 82 B | 0644 |
|
defence360.py | File | 2.9 KB | 0644 |
|
migrate.py | File | 3.01 KB | 0644 |
|
router.py | File | 1.65 KB | 0644 |
|
run.py | File | 109 B | 0644 |
|
sentry.py | File | 2.85 KB | 0644 |
|