from __future__ import absolute_import import sys import types from sentry_sdk._functools import wraps from sentry_sdk.hub import Hub from sentry_sdk._compat import reraise from sentry_sdk.utils import capture_internal_exceptions, event_from_exception from sentry_sdk.integrations import Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk._types import MYPY if MYPY: from typing import Any from typing import Iterator from typing import TypeVar from typing import Optional from typing import Callable from sentry_sdk.client import Client from sentry_sdk._types import ExcInfo T = TypeVar("T") F = TypeVar("F", bound=Callable[..., Any]) WRAPPED_FUNC = "_wrapped_{}_" INSPECT_FUNC = "_inspect_{}" # Required format per apache_beam/transforms/core.py USED_FUNC = "_sentry_used_" class BeamIntegration(Integration): identifier = "beam" @staticmethod def setup_once(): # type: () -> None from apache_beam.transforms.core import DoFn, ParDo # type: ignore ignore_logger("root") ignore_logger("bundle_processor.create") function_patches = ["process", "start_bundle", "finish_bundle", "setup"] for func_name in function_patches: setattr( DoFn, INSPECT_FUNC.format(func_name), _wrap_inspect_call(DoFn, func_name), ) old_init = ParDo.__init__ def sentry_init_pardo(self, fn, *args, **kwargs): # type: (ParDo, Any, *Any, **Any) -> Any # Do not monkey patch init twice if not getattr(self, "_sentry_is_patched", False): for func_name in function_patches: if not hasattr(fn, func_name): continue wrapped_func = WRAPPED_FUNC.format(func_name) # Check to see if inspect is set and process is not # to avoid monkey patching process twice. # Check to see if function is part of object for # backwards compatibility. process_func = getattr(fn, func_name) inspect_func = getattr(fn, INSPECT_FUNC.format(func_name)) if not getattr(inspect_func, USED_FUNC, False) and not getattr( process_func, USED_FUNC, False ): setattr(fn, wrapped_func, process_func) setattr(fn, func_name, _wrap_task_call(process_func)) self._sentry_is_patched = True old_init(self, fn, *args, **kwargs) ParDo.__init__ = sentry_init_pardo def _wrap_inspect_call(cls, func_name): # type: (Any, Any) -> Any from apache_beam.typehints.decorators import getfullargspec # type: ignore if not hasattr(cls, func_name): return None def _inspect(self): # type: (Any) -> Any """ Inspect function overrides the way Beam gets argspec. """ wrapped_func = WRAPPED_FUNC.format(func_name) if hasattr(self, wrapped_func): process_func = getattr(self, wrapped_func) else: process_func = getattr(self, func_name) setattr(self, func_name, _wrap_task_call(process_func)) setattr(self, wrapped_func, process_func) # getfullargspec is deprecated in more recent beam versions and get_function_args_defaults # (which uses Signatures internally) should be used instead. try: from apache_beam.transforms.core import get_function_args_defaults return get_function_args_defaults(process_func) except ImportError: return getfullargspec(process_func) setattr(_inspect, USED_FUNC, True) return _inspect def _wrap_task_call(func): # type: (F) -> F """ Wrap task call with a try catch to get exceptions. Pass the client on to raise_exception so it can get rebinded. """ client = Hub.current.client @wraps(func) def _inner(*args, **kwargs): # type: (*Any, **Any) -> Any try: gen = func(*args, **kwargs) except Exception: raise_exception(client) if not isinstance(gen, types.GeneratorType): return gen return _wrap_generator_call(gen, client) setattr(_inner, USED_FUNC, True) return _inner # type: ignore def _capture_exception(exc_info, hub): # type: (ExcInfo, Hub) -> None """ Send Beam exception to Sentry. """ integration = hub.get_integration(BeamIntegration) if integration is None: return client = hub.client if client is None: return event, hint = event_from_exception( exc_info, client_options=client.options, mechanism={"type": "beam", "handled": False}, ) hub.capture_event(event, hint=hint) def raise_exception(client): # type: (Optional[Client]) -> None """ Raise an exception. If the client is not in the hub, rebind it. """ hub = Hub.current if hub.client is None: hub.bind_client(client) exc_info = sys.exc_info() with capture_internal_exceptions(): _capture_exception(exc_info, hub) reraise(*exc_info) def _wrap_generator_call(gen, client): # type: (Iterator[T], Optional[Client]) -> Iterator[T] """ Wrap the generator to handle any failures. """ while True: try: yield next(gen) except StopIteration: break except Exception: raise_exception(client)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
django | Folder | 0755 |
|
|
spark | Folder | 0755 |
|
|
__init__.py | File | 6.36 KB | 0644 |
|
_wsgi_common.py | File | 4.65 KB | 0644 |
|
aiohttp.py | File | 7.77 KB | 0644 |
|
argv.py | File | 945 B | 0644 |
|
asgi.py | File | 8.17 KB | 0644 |
|
atexit.py | File | 1.79 KB | 0644 |
|
aws_lambda.py | File | 12.64 KB | 0644 |
|
beam.py | File | 5.53 KB | 0644 |
|
boto3.py | File | 3.64 KB | 0644 |
|
bottle.py | File | 6.04 KB | 0644 |
|
celery.py | File | 8.91 KB | 0644 |
|
chalice.py | File | 4.47 KB | 0644 |
|
dedupe.py | File | 1.14 KB | 0644 |
|
excepthook.py | File | 2.14 KB | 0644 |
|
executing.py | File | 1.98 KB | 0644 |
|
falcon.py | File | 6.64 KB | 0644 |
|
flask.py | File | 7.25 KB | 0644 |
|
gcp.py | File | 7.21 KB | 0644 |
|
gnu_backtrace.py | File | 2.84 KB | 0644 |
|
logging.py | File | 7.4 KB | 0644 |
|
modules.py | File | 1.36 KB | 0644 |
|
pure_eval.py | File | 4.41 KB | 0644 |
|
pyramid.py | File | 6.91 KB | 0644 |
|
redis.py | File | 3 KB | 0644 |
|
rq.py | File | 4.68 KB | 0644 |
|
sanic.py | File | 7.53 KB | 0644 |
|
serverless.py | File | 1.92 KB | 0644 |
|
sqlalchemy.py | File | 2.87 KB | 0644 |
|
stdlib.py | File | 7.18 KB | 0644 |
|
threading.py | File | 2.79 KB | 0644 |
|
tornado.py | File | 6.84 KB | 0644 |
|
trytond.py | File | 1.69 KB | 0644 |
|
wsgi.py | File | 10.13 KB | 0644 |
|