import os import threading from time import sleep, time from sentry_sdk._compat import check_thread_support from sentry_sdk._queue import Queue, Full from sentry_sdk.utils import logger from sentry_sdk._types import MYPY if MYPY: from typing import Any from typing import Optional from typing import Callable _TERMINATOR = object() class BackgroundWorker(object): def __init__(self, queue_size=30): # type: (int) -> None check_thread_support() self._queue = Queue(queue_size) # type: Queue self._lock = threading.Lock() self._thread = None # type: Optional[threading.Thread] self._thread_for_pid = None # type: Optional[int] @property def is_alive(self): # type: () -> bool if self._thread_for_pid != os.getpid(): return False if not self._thread: return False return self._thread.is_alive() def _ensure_thread(self): # type: () -> None if not self.is_alive: self.start() def _timed_queue_join(self, timeout): # type: (float) -> bool deadline = time() + timeout queue = self._queue queue.all_tasks_done.acquire() try: while queue.unfinished_tasks: delay = deadline - time() if delay <= 0: return False queue.all_tasks_done.wait(timeout=delay) return True finally: queue.all_tasks_done.release() def start(self): # type: () -> None with self._lock: if not self.is_alive: self._thread = threading.Thread( target=self._target, name="raven-sentry.BackgroundWorker" ) self._thread.setDaemon(True) self._thread.start() self._thread_for_pid = os.getpid() def kill(self): # type: () -> None """ Kill worker thread. Returns immediately. Not useful for waiting on shutdown for events, use `flush` for that. """ logger.debug("background worker got kill request") with self._lock: if self._thread: try: self._queue.put_nowait(_TERMINATOR) except Full: logger.debug("background worker queue full, kill failed") self._thread = None self._thread_for_pid = None def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None logger.debug("background worker got flush request") with self._lock: if self.is_alive and timeout > 0.0: self._wait_flush(timeout, callback) logger.debug("background worker flushed") def _wait_flush(self, timeout, callback): # type: (float, Optional[Any]) -> None initial_timeout = min(0.1, timeout) if not self._timed_queue_join(initial_timeout): pending = self._queue.qsize() logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) self._timed_queue_join(timeout - initial_timeout) def submit(self, callback): # type: (Callable[[], None]) -> None self._ensure_thread() try: self._queue.put_nowait(callback) except Full: logger.debug("background worker queue full, dropping event") def _target(self): # type: () -> None while True: callback = self._queue.get() try: if callback is _TERMINATOR: break try: callback() except Exception: logger.error("Failed processing job", exc_info=True) finally: self._queue.task_done() sleep(0)
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
integrations | Folder | 0755 |
|
|
__init__.py | File | 854 B | 0644 |
|
_compat.py | File | 2.3 KB | 0644 |
|
_functools.py | File | 2.22 KB | 0644 |
|
_queue.py | File | 8.21 KB | 0644 |
|
_types.py | File | 1.26 KB | 0644 |
|
api.py | File | 4.69 KB | 0644 |
|
attachments.py | File | 1.75 KB | 0644 |
|
client.py | File | 14.03 KB | 0644 |
|
consts.py | File | 3.5 KB | 0644 |
|
debug.py | File | 1.11 KB | 0644 |
|
envelope.py | File | 8.17 KB | 0644 |
|
hub.py | File | 21.35 KB | 0644 |
|
py.typed | File | 0 B | 0644 |
|
scope.py | File | 15.62 KB | 0644 |
|
serializer.py | File | 15.92 KB | 0644 |
|
sessions.py | File | 7.67 KB | 0644 |
|
tracing.py | File | 25.24 KB | 0644 |
|
transport.py | File | 11.48 KB | 0644 |
|
utils.py | File | 26.26 KB | 0644 |
|
worker.py | File | 3.87 KB | 0644 |
|