from __future__ import absolute_import
import sys
from sentry_sdk import configure_scope
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration
from sentry_sdk.utils import (
capture_internal_exceptions,
exc_info_from_error,
single_exception_from_error_tuple,
walk_exception_chain,
event_hint_with_exc_info,
)
from sentry_sdk._types import MYPY
if MYPY:
from typing import Any
from typing import Optional
from sentry_sdk._types import ExcInfo, Event, Hint
class SparkWorkerIntegration(Integration):
identifier = "spark_worker"
@staticmethod
def setup_once():
# type: () -> None
import pyspark.daemon as original_daemon
original_daemon.worker_main = _sentry_worker_main
def _capture_exception(exc_info, hub):
# type: (ExcInfo, Hub) -> None
client = hub.client
client_options = client.options # type: ignore
mechanism = {"type": "spark", "handled": False}
exc_info = exc_info_from_error(exc_info)
exc_type, exc_value, tb = exc_info
rv = []
# On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
for exc_type, exc_value, tb in walk_exception_chain(exc_info):
if exc_type not in (SystemExit, EOFError, ConnectionResetError):
rv.append(
single_exception_from_error_tuple(
exc_type, exc_value, tb, client_options, mechanism
)
)
if rv:
rv.reverse()
hint = event_hint_with_exc_info(exc_info)
event = {"level": "error", "exception": {"values": rv}}
_tag_task_context()
hub.capture_event(event, hint=hint)
def _tag_task_context():
# type: () -> None
from pyspark.taskcontext import TaskContext
with configure_scope() as scope:
@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
integration = Hub.current.get_integration(SparkWorkerIntegration)
task_context = TaskContext.get()
if integration is None or task_context is None:
return event
event.setdefault("tags", {}).setdefault(
"stageId", str(task_context.stageId())
)
event["tags"].setdefault("partitionId", str(task_context.partitionId()))
event["tags"].setdefault(
"attemptNumber", str(task_context.attemptNumber())
)
event["tags"].setdefault(
"taskAttemptId", str(task_context.taskAttemptId())
)
if task_context._localProperties:
if "sentry_app_name" in task_context._localProperties:
event["tags"].setdefault(
"app_name", task_context._localProperties["sentry_app_name"]
)
event["tags"].setdefault(
"application_id",
task_context._localProperties["sentry_application_id"],
)
if "callSite.short" in task_context._localProperties:
event.setdefault("extra", {}).setdefault(
"callSite", task_context._localProperties["callSite.short"]
)
return event
def _sentry_worker_main(*args, **kwargs):
# type: (*Optional[Any], **Optional[Any]) -> None
import pyspark.worker as original_worker
try:
original_worker.main(*args, **kwargs)
except SystemExit:
if Hub.current.get_integration(SparkWorkerIntegration) is not None:
hub = Hub.current
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc_info, hub)