"""Async gunicorn worker for aiohttp.web""" import asyncio import os import re import signal import sys from types import FrameType from typing import Any, Awaitable, Callable, Optional, Union # noqa from gunicorn.config import AccessLogFormat as GunicornAccessLogFormat from gunicorn.workers import base from aiohttp import web from .helpers import set_result from .web_app import Application from .web_log import AccessLogger try: import ssl SSLContext = ssl.SSLContext except ImportError: # pragma: no cover ssl = None # type: ignore SSLContext = object # type: ignore __all__ = ("GunicornWebWorker", "GunicornUVLoopWebWorker", "GunicornTokioWebWorker") class GunicornWebWorker(base.Worker): DEFAULT_AIOHTTP_LOG_FORMAT = AccessLogger.LOG_FORMAT DEFAULT_GUNICORN_LOG_FORMAT = GunicornAccessLogFormat.default def __init__(self, *args: Any, **kw: Any) -> None: # pragma: no cover super().__init__(*args, **kw) self._task = None # type: Optional[asyncio.Task[None]] self.exit_code = 0 self._notify_waiter = None # type: Optional[asyncio.Future[bool]] def init_process(self) -> None: # create new event_loop after fork asyncio.get_event_loop().close() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) super().init_process() def run(self) -> None: self._task = self.loop.create_task(self._run()) try: # ignore all finalization problems self.loop.run_until_complete(self._task) except Exception: self.log.exception("Exception in gunicorn worker") if sys.version_info >= (3, 6): self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() sys.exit(self.exit_code) async def _run(self) -> None: if isinstance(self.wsgi, Application): app = self.wsgi elif asyncio.iscoroutinefunction(self.wsgi): app = await self.wsgi() else: raise RuntimeError( "wsgi app should be either Application or " "async function returning Application, got {}".format(self.wsgi) ) access_log = self.log.access_log if self.cfg.accesslog else None runner = web.AppRunner( app, logger=self.log, keepalive_timeout=self.cfg.keepalive, access_log=access_log, access_log_format=self._get_valid_log_format(self.cfg.access_log_format), ) await runner.setup() ctx = self._create_ssl_context(self.cfg) if self.cfg.is_ssl else None runner = runner assert runner is not None server = runner.server assert server is not None for sock in self.sockets: site = web.SockSite( runner, sock, ssl_context=ctx, shutdown_timeout=self.cfg.graceful_timeout / 100 * 95, ) await site.start() # If our parent changed then we shut down. pid = os.getpid() try: while self.alive: # type: ignore self.notify() cnt = server.requests_count if self.cfg.max_requests and cnt > self.cfg.max_requests: self.alive = False self.log.info("Max requests, shutting down: %s", self) elif pid == os.getpid() and self.ppid != os.getppid(): self.alive = False self.log.info("Parent changed, shutting down: %s", self) else: await self._wait_next_notify() except BaseException: pass await runner.cleanup() def _wait_next_notify(self) -> "asyncio.Future[bool]": self._notify_waiter_done() loop = self.loop assert loop is not None self._notify_waiter = waiter = loop.create_future() self.loop.call_later(1.0, self._notify_waiter_done, waiter) return waiter def _notify_waiter_done( self, waiter: Optional["asyncio.Future[bool]"] = None ) -> None: if waiter is None: waiter = self._notify_waiter if waiter is not None: set_result(waiter, True) if waiter is self._notify_waiter: self._notify_waiter = None def init_signals(self) -> None: # Set up signals through the event loop API. self.loop.add_signal_handler( signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None ) self.loop.add_signal_handler( signal.SIGTERM, self.handle_exit, signal.SIGTERM, None ) self.loop.add_signal_handler( signal.SIGINT, self.handle_quit, signal.SIGINT, None ) self.loop.add_signal_handler( signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None ) self.loop.add_signal_handler( signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None ) self.loop.add_signal_handler( signal.SIGABRT, self.handle_abort, signal.SIGABRT, None ) # Don't let SIGTERM and SIGUSR1 disturb active requests # by interrupting system calls signal.siginterrupt(signal.SIGTERM, False) signal.siginterrupt(signal.SIGUSR1, False) def handle_quit(self, sig: int, frame: FrameType) -> None: self.alive = False # worker_int callback self.cfg.worker_int(self) # wakeup closing process self._notify_waiter_done() def handle_abort(self, sig: int, frame: FrameType) -> None: self.alive = False self.exit_code = 1 self.cfg.worker_abort(self) sys.exit(1) @staticmethod def _create_ssl_context(cfg: Any) -> "SSLContext": """Creates SSLContext instance for usage in asyncio.create_server. See ssl.SSLSocket.__init__ for more details. """ if ssl is None: # pragma: no cover raise RuntimeError("SSL is not supported.") ctx = ssl.SSLContext(cfg.ssl_version) ctx.load_cert_chain(cfg.certfile, cfg.keyfile) ctx.verify_mode = cfg.cert_reqs if cfg.ca_certs: ctx.load_verify_locations(cfg.ca_certs) if cfg.ciphers: ctx.set_ciphers(cfg.ciphers) return ctx def _get_valid_log_format(self, source_format: str) -> str: if source_format == self.DEFAULT_GUNICORN_LOG_FORMAT: return self.DEFAULT_AIOHTTP_LOG_FORMAT elif re.search(r"%\([^\)]+\)", source_format): raise ValueError( "Gunicorn's style options in form of `%(name)s` are not " "supported for the log formatting. Please use aiohttp's " "format specification to configure access log formatting: " "http://docs.aiohttp.org/en/stable/logging.html" "#format-specification" ) else: return source_format class GunicornUVLoopWebWorker(GunicornWebWorker): def init_process(self) -> None: import uvloop # Close any existing event loop before setting a # new policy. asyncio.get_event_loop().close() # Setup uvloop policy, so that every # asyncio.get_event_loop() will create an instance # of uvloop event loop. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) super().init_process() class GunicornTokioWebWorker(GunicornWebWorker): def init_process(self) -> None: # pragma: no cover import tokio # Close any existing event loop before setting a # new policy. asyncio.get_event_loop().close() # Setup tokio policy, so that every # asyncio.get_event_loop() will create an instance # of tokio event loop. asyncio.set_event_loop_policy(tokio.EventLoopPolicy()) super().init_process()
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
.hash | Folder | 0755 |
|
|
__pycache__ | Folder | 0755 |
|
|
__init__.py | File | 6.77 KB | 0644 |
|
_cparser.pxd | File | 3.87 KB | 0644 |
|
_find_header.c | File | 183.17 KB | 0644 |
|
_find_header.h | File | 170 B | 0644 |
|
_find_header.pxd | File | 68 B | 0644 |
|
_frozenlist.c | File | 287.3 KB | 0644 |
|
_frozenlist.cpython-38-x86_64-linux-gnu.so | File | 337.8 KB | 0755 |
|
_frozenlist.pyx | File | 2.54 KB | 0644 |
|
_headers.pxi | File | 1.96 KB | 0644 |
|
_helpers.c | File | 207.02 KB | 0644 |
|
_helpers.cpython-38-x86_64-linux-gnu.so | File | 223.16 KB | 0755 |
|
_helpers.pyi | File | 202 B | 0644 |
|
_helpers.pyx | File | 1.02 KB | 0644 |
|
_http_parser.c | File | 987.82 KB | 0644 |
|
_http_parser.cpython-38-x86_64-linux-gnu.so | File | 2.17 MB | 0755 |
|
_http_parser.pyx | File | 28.34 KB | 0644 |
|
_http_writer.c | File | 208.03 KB | 0644 |
|
_http_writer.cpython-38-x86_64-linux-gnu.so | File | 199.89 KB | 0755 |
|
_http_writer.pyx | File | 4.1 KB | 0644 |
|
_websocket.c | File | 134.21 KB | 0644 |
|
_websocket.cpython-38-x86_64-linux-gnu.so | File | 98.52 KB | 0755 |
|
_websocket.pyx | File | 1.52 KB | 0644 |
|
abc.py | File | 5.12 KB | 0644 |
|
base_protocol.py | File | 2.64 KB | 0644 |
|
client.py | File | 42.89 KB | 0644 |
|
client_exceptions.py | File | 8.33 KB | 0644 |
|
client_proto.py | File | 7.97 KB | 0644 |
|
client_reqrep.py | File | 35.58 KB | 0644 |
|
client_ws.py | File | 10.05 KB | 0644 |
|
connector.py | File | 41.96 KB | 0644 |
|
cookiejar.py | File | 11.88 KB | 0644 |
|
formdata.py | File | 5.94 KB | 0644 |
|
frozenlist.py | File | 1.68 KB | 0644 |
|
frozenlist.pyi | File | 1.4 KB | 0644 |
|
hdrs.py | File | 3.37 KB | 0644 |
|
helpers.py | File | 22.38 KB | 0644 |
|
http.py | File | 1.78 KB | 0644 |
|
http_exceptions.py | File | 2.53 KB | 0644 |
|
http_parser.py | File | 30.06 KB | 0644 |
|
http_websocket.py | File | 24.51 KB | 0644 |
|
http_writer.py | File | 5.22 KB | 0644 |
|
locks.py | File | 1.19 KB | 0644 |
|
log.py | File | 325 B | 0644 |
|
multipart.py | File | 31.5 KB | 0644 |
|
payload.py | File | 13.02 KB | 0644 |
|
payload_streamer.py | File | 2.05 KB | 0644 |
|
py.typed | File | 7 B | 0644 |
|
pytest_plugin.py | File | 10.75 KB | 0644 |
|
resolver.py | File | 4.5 KB | 0644 |
|
signals.py | File | 852 B | 0644 |
|
signals.pyi | File | 319 B | 0644 |
|
streams.py | File | 20.05 KB | 0644 |
|
tcp_helpers.py | File | 962 B | 0644 |
|
test_utils.py | File | 19.78 KB | 0644 |
|
tracing.py | File | 14.03 KB | 0644 |
|
typedefs.py | File | 1.34 KB | 0644 |
|
web.py | File | 17.46 KB | 0644 |
|
web_app.py | File | 16.65 KB | 0644 |
|
web_exceptions.py | File | 9.87 KB | 0644 |
|
web_fileresponse.py | File | 8.81 KB | 0644 |
|
web_log.py | File | 7.32 KB | 0644 |
|
web_middlewares.py | File | 4.09 KB | 0644 |
|
web_protocol.py | File | 22.71 KB | 0644 |
|
web_request.py | File | 25.83 KB | 0644 |
|
web_response.py | File | 25.59 KB | 0644 |
|
web_routedef.py | File | 5.97 KB | 0644 |
|
web_runner.py | File | 10.93 KB | 0644 |
|
web_server.py | File | 2.01 KB | 0644 |
|
web_urldispatcher.py | File | 38.61 KB | 0644 |
|
web_ws.py | File | 16.39 KB | 0644 |
|
worker.py | File | 7.83 KB | 0644 |
|