import os
import uuid
import time
from datetime import datetime
from threading import Thread, Lock
from contextlib import contextmanager
from sentry_sdk._types import MYPY
from sentry_sdk.utils import format_timestamp
if MYPY:
import sentry_sdk
from typing import Optional
from typing import Union
from typing import Any
from typing import Dict
from typing import Generator
from sentry_sdk._types import SessionStatus
def is_auto_session_tracking_enabled(hub=None):
# type: (Optional[sentry_sdk.Hub]) -> bool
"""Utility function to find out if session tracking is enabled."""
if hub is None:
hub = sentry_sdk.Hub.current
should_track = hub.scope._force_auto_session_tracking
if should_track is None:
exp = hub.client.options["_experiments"] if hub.client else {}
should_track = exp.get("auto_session_tracking")
return should_track
@contextmanager
def auto_session_tracking(hub=None):
# type: (Optional[sentry_sdk.Hub]) -> Generator[None, None, None]
"""Starts and stops a session automatically around a block."""
if hub is None:
hub = sentry_sdk.Hub.current
should_track = is_auto_session_tracking_enabled(hub)
if should_track:
hub.start_session()
try:
yield
finally:
if should_track:
hub.end_session()
def _make_uuid(
val, # type: Union[str, uuid.UUID]
):
# type: (...) -> uuid.UUID
if isinstance(val, uuid.UUID):
return val
return uuid.UUID(val)
TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed")
class SessionFlusher(object):
def __init__(
self,
flush_func, # type: Any
flush_interval=10, # type: int
):
# type: (...) -> None
self.flush_func = flush_func
self.flush_interval = flush_interval
self.pending = {} # type: Dict[str, Any]
self._thread = None # type: Optional[Thread]
self._thread_lock = Lock()
self._thread_for_pid = None # type: Optional[int]
self._running = True
def flush(self):
# type: (...) -> None
pending = self.pending
self.pending = {}
self.flush_func(list(pending.values()))
def _ensure_running(self):
# type: (...) -> None
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None
with self._thread_lock:
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None
def _thread():
# type: (...) -> None
while self._running:
time.sleep(self.flush_interval)
if self.pending and self._running:
self.flush()
thread = Thread(target=_thread)
thread.daemon = True
thread.start()
self._thread = thread
self._thread_for_pid = os.getpid()
return None
def add_session(
self, session # type: Session
):
# type: (...) -> None
self.pending[session.sid.hex] = session.to_json()
self._ensure_running()
def kill(self):
# type: (...) -> None
self._running = False
def __del__(self):
# type: (...) -> None
self.kill()
class Session(object):
def __init__(
self,
sid=None, # type: Optional[Union[str, uuid.UUID]]
did=None, # type: Optional[str]
timestamp=None, # type: Optional[datetime]
started=None, # type: Optional[datetime]
duration=None, # type: Optional[float]
status=None, # type: Optional[SessionStatus]
release=None, # type: Optional[str]
environment=None, # type: Optional[str]
user_agent=None, # type: Optional[str]
ip_address=None, # type: Optional[str]
errors=None, # type: Optional[int]
user=None, # type: Optional[Any]
):
# type: (...) -> None
if sid is None:
sid = uuid.uuid4()
if started is None:
started = datetime.utcnow()
if status is None:
status = "ok"
self.status = status
self.did = None # type: Optional[str]
self.started = started
self.release = None # type: Optional[str]
self.environment = None # type: Optional[str]
self.duration = None # type: Optional[float]
self.user_agent = None # type: Optional[str]
self.ip_address = None # type: Optional[str]
self.errors = 0
self.update(
sid=sid,
did=did,
timestamp=timestamp,
duration=duration,
release=release,
environment=environment,
user_agent=user_agent,
ip_address=ip_address,
errors=errors,
user=user,
)
def update(
self,
sid=None, # type: Optional[Union[str, uuid.UUID]]
did=None, # type: Optional[str]
timestamp=None, # type: Optional[datetime]
started=None, # type: Optional[datetime]
duration=None, # type: Optional[float]
status=None, # type: Optional[SessionStatus]
release=None, # type: Optional[str]
environment=None, # type: Optional[str]
user_agent=None, # type: Optional[str]
ip_address=None, # type: Optional[str]
errors=None, # type: Optional[int]
user=None, # type: Optional[Any]
):
# type: (...) -> None
# If a user is supplied we pull some data form it
if user:
if ip_address is None:
ip_address = user.get("ip_address")
if did is None:
did = user.get("id") or user.get("email") or user.get("username")
if sid is not None:
self.sid = _make_uuid(sid)
if did is not None:
self.did = str(did)
if timestamp is None:
timestamp = datetime.utcnow()
self.timestamp = timestamp
if started is not None:
self.started = started
if duration is not None:
self.duration = duration
if release is not None:
self.release = release
if environment is not None:
self.environment = environment
if ip_address is not None:
self.ip_address = ip_address
if user_agent is not None:
self.user_agent = user_agent
if errors is not None:
self.errors = errors
if status is not None:
self.status = status
def close(
self, status=None # type: Optional[SessionStatus]
):
# type: (...) -> Any
if status is None and self.status == "ok":
status = "exited"
if status is not None:
self.update(status=status)
def to_json(self):
# type: (...) -> Any
rv = {
"sid": str(self.sid),
"init": True,
"started": format_timestamp(self.started),
"timestamp": format_timestamp(self.timestamp),
"status": self.status,
} # type: Dict[str, Any]
if self.errors:
rv["errors"] = self.errors
if self.did is not None:
rv["did"] = self.did
if self.duration is not None:
rv["duration"] = self.duration
attrs = {}
if self.release is not None:
attrs["release"] = self.release
if self.environment is not None:
attrs["environment"] = self.environment
if self.ip_address is not None:
attrs["ip_address"] = self.ip_address
if self.user_agent is not None:
attrs["user_agent"] = self.user_agent
if attrs:
rv["attrs"] = attrs
return rv