import asyncio import logging import os import sys import defence360agent.internals.logger from defence360agent.contracts.config import Core as Config from defence360agent.simple_rpc import SUCCESS, SocketError from defence360agent.utils import is_root_user from defence360agent.utils.cli import ( EXIT_CODES, EXITCODE_GENERAL_ERROR, print_error, print_response, print_warnings, ) from defence360agent.utils.parsers import EnvParser, create_cli_parser from defence360agent.sentry import flush_sentry logger = logging.getLogger(__name__) def main(rpc_handlers_init, cli_args): # get ready to start: set conservative umask os.umask(Config.FILE_UMASK) defence360agent.internals.logger.reconfigure() rpc_handlers_init() parser = create_cli_parser() args = parser.parse_args(args=cli_args) if args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE"): defence360agent.internals.logger.update_logging_config_from_file( args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE") ) if args.console_log_level: defence360agent.internals.logger.setConsoleLogLevel( args.console_log_level ) if hasattr(args, "endpoint") and hasattr(args, "generate_endpoint_params"): try: cli_kwargs = args.generate_endpoint_params(args) envvar_kwargs = EnvParser.parse( os.environ, args.command, args.envvar_parameter_options, exclude=cli_kwargs, ) result, data = args.endpoint(**envvar_kwargs, **cli_kwargs) print_warnings(data) flush_sentry() if result == SUCCESS: print_response(args.command, data, args.json, args.verbose) else: print_error(result, data, args.json, args.verbose) sys.exit(EXIT_CODES[result]) except SocketError as e: print_response( None, {"items": "ERROR: {}".format(e)}, args.json, args.verbose ) sys.exit(EXITCODE_GENERAL_ERROR) else: print(parser.format_help()) def entrypoint(rpc_handlers_init): if not is_root_user(): logger.info("%s could be used by the root user only!", Config.NAME) print( "Imunify360 CLI is unavailable for non-root user", file=sys.stderr ) sys.exit(EXITCODE_GENERAL_ERROR) try: main(rpc_handlers_init, sys.argv[1:]) # ensure loop is closed to prevent asyncio warning # (https://bugs.python.org/issue23548) asyncio.get_event_loop().close() except KeyboardInterrupt: logger.warning("User pressed Ctrl+C, exiting...") sys.exit(EXITCODE_GENERAL_ERROR) except Exception: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
api | Folder | 0755 |
|
|
application | Folder | 0755 |
|
|
contracts | Folder | 0755 |
|
|
feature_management | Folder | 0755 |
|
|
files | Folder | 0755 |
|
|
hooks | Folder | 0755 |
|
|
internals | Folder | 0755 |
|
|
migrations | Folder | 0755 |
|
|
model | Folder | 0755 |
|
|
mr_proper | Folder | 0755 |
|
|
myimunify | Folder | 0755 |
|
|
plugins | Folder | 0755 |
|
|
rpc_tools | Folder | 0755 |
|
|
simple_rpc | Folder | 0755 |
|
|
subsys | Folder | 0755 |
|
|
utils | Folder | 0755 |
|
|
__init__.py | File | 0 B | 0644 |
|
__main__.py | File | 43 B | 0644 |
|
_version.py | File | 82 B | 0644 |
|
defence360.py | File | 2.9 KB | 0644 |
|
migrate.py | File | 3.01 KB | 0644 |
|
router.py | File | 1.65 KB | 0644 |
|
run.py | File | 109 B | 0644 |
|
sentry.py | File | 2.85 KB | 0644 |
|