# SPDX-FileCopyrightText: 2015 Eric Larson
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import functools
import types
import zlib
from typing import TYPE_CHECKING, Any, Collection, Mapping
from pip._vendor.requests.adapters import HTTPAdapter
from pip._vendor.cachecontrol.cache import DictCache
from pip._vendor.cachecontrol.controller import PERMANENT_REDIRECT_STATUSES, CacheController
from pip._vendor.cachecontrol.filewrapper import CallbackFileWrapper
if TYPE_CHECKING:
from pip._vendor.requests import PreparedRequest, Response
from pip._vendor.urllib3 import HTTPResponse
from pip._vendor.cachecontrol.cache import BaseCache
from pip._vendor.cachecontrol.heuristics import BaseHeuristic
from pip._vendor.cachecontrol.serialize import Serializer
class CacheControlAdapter(HTTPAdapter):
invalidating_methods = {"PUT", "PATCH", "DELETE"}
def __init__(
self,
cache: BaseCache | None = None,
cache_etags: bool = True,
controller_class: type[CacheController] | None = None,
serializer: Serializer | None = None,
heuristic: BaseHeuristic | None = None,
cacheable_methods: Collection[str] | None = None,
*args: Any,
**kw: Any,
) -> None:
super().__init__(*args, **kw)
self.cache = DictCache() if cache is None else cache
self.heuristic = heuristic
self.cacheable_methods = cacheable_methods or ("GET",)
controller_factory = controller_class or CacheController
self.controller = controller_factory(
self.cache, cache_etags=cache_etags, serializer=serializer
)
def send(
self,
request: PreparedRequest,
stream: bool = False,
timeout: None | float | tuple[float, float] | tuple[float, None] = None,
verify: bool | str = True,
cert: (None | bytes | str | tuple[bytes | str, bytes | str]) = None,
proxies: Mapping[str, str] | None = None,
cacheable_methods: Collection[str] | None = None,
) -> Response:
"""
Send a request. Use the request information to see if it
exists in the cache and cache the response if we need to and can.
"""
cacheable = cacheable_methods or self.cacheable_methods
if request.method in cacheable:
try:
cached_response = self.controller.cached_request(request)
except zlib.error:
cached_response = None
if cached_response:
return self.build_response(request, cached_response, from_cache=True)
# check for etags and add headers if appropriate
request.headers.update(self.controller.conditional_headers(request))
resp = super().send(request, stream, timeout, verify, cert, proxies)
return resp
def build_response(
self,
request: PreparedRequest,
response: HTTPResponse,
from_cache: bool = False,
cacheable_methods: Collection[str] | None = None,
) -> Response:
"""
Build a response by making a request or using the cache.
This will end up calling send and returning a potentially
cached response
"""
cacheable = cacheable_methods or self.cacheable_methods
if not from_cache and request.method in cacheable:
# Check for any heuristics that might update headers
# before trying to cache.
if self.heuristic:
response = self.heuristic.apply(response)
# apply any expiration heuristics
if response.status == 304:
# We must have sent an ETag request. This could mean
# that we've been expired already or that we simply
# have an etag. In either case, we want to try and
# update the cache if that is the case.
cached_response = self.controller.update_cached_response(
request, response
)
if cached_response is not response:
from_cache = True
# We are done with the server response, read a
# possible response body (compliant servers will
# not return one, but we cannot be 100% sure) and
# release the connection back to the pool.
response.read(decode_content=False)
response.release_conn()
response = cached_response
# We always cache the 301 responses
elif int(response.status) in PERMANENT_REDIRECT_STATUSES:
self.controller.cache_response(request, response)
else:
# Wrap the response file with a wrapper that will cache the
# response when the stream has been consumed.
response._fp = CallbackFileWrapper( # type: ignore[assignment]
response._fp, # type: ignore[arg-type]
functools.partial(
self.controller.cache_response, request, response
),
)
if response.chunked:
super_update_chunk_length = response._update_chunk_length
def _update_chunk_length(self: HTTPResponse) -> None:
super_update_chunk_length()
if self.chunk_left == 0:
self._fp._close() # type: ignore[union-attr]
response._update_chunk_length = types.MethodType( # type: ignore[method-assign]
_update_chunk_length, response
)
resp: Response = super().build_response(request, response) # type: ignore[no-untyped-call]
# See if we should invalidate the cache.
if request.method in self.invalidating_methods and resp.ok:
assert request.url is not None
cache_url = self.controller.cache_url(request.url)
self.cache.delete(cache_url)
# Give the request a from_cache attr to let people use it
resp.from_cache = from_cache # type: ignore[attr-defined]
return resp
def close(self) -> None:
self.cache.close()
super().close() # type: ignore[no-untyped-call]