Source code for zgw_consumers.client

import logging
import time
from dataclasses import dataclass
from typing import Any, TypeVar

import jwt
from ape_pie import APIClient
from requests.auth import AuthBase
from requests.models import PreparedRequest

from zgw_consumers.constants import AuthTypes
from zgw_consumers.models import Service

from .nlx import NLXClient

logger = logging.getLogger(__name__)


ClientT = TypeVar("ClientT", bound=APIClient)


[docs] def build_client( service: Service, client_factory: type[ClientT] = NLXClient, **kwargs ) -> ClientT: """ Build a client for a given :class:`zgw_consumers.models.Service`. """ config_adapter = ServiceConfigAdapter(service) return client_factory.configure_from( config_adapter, nlx_base_url=service.nlx, **kwargs )
[docs] @dataclass class ServiceConfigAdapter: """An implementation of :class:`ape_pie.ConfigAdapter` that will extract session kwargs from a given :class:`zgw_consumers.models.Service`. """ service: Service def get_client_base_url(self) -> str: return self.service.api_root def get_client_session_kwargs(self) -> dict[str, Any]: kwargs = {} # mTLS: verify server certificate if configured if server_cert := self.service.server_certificate: # NOTE: this only works with a file-system based storage! kwargs["verify"] = server_cert.public_certificate.path # mTLS: offer client certificate if configured if client_cert := self.service.client_certificate: client_cert_path = client_cert.public_certificate.path # decide between single cert or cert,key tuple variant kwargs["cert"] = ( (client_cert_path, privkey.path) if (privkey := client_cert.private_key) else client_cert_path ) match self.service.auth_type: case AuthTypes.api_key: kwargs["auth"] = APIKeyAuth( header=self.service.header_key, key=self.service.header_value, ) case AuthTypes.zgw: kwargs["auth"] = ZGWAuth(service=self.service) # set timeout for the requests kwargs["timeout"] = self.service.timeout return kwargs
[docs] @dataclass class APIKeyAuth(AuthBase): """API Key based :class:`requests.auth.AuthBase` implementation.""" header: str key: str def __call__(self, request: PreparedRequest): request.headers[self.header] = self.key return request
[docs] @dataclass class ZGWAuth(AuthBase): """ :class:`requests.auth.AuthBase` implementation for ZGW APIs auth. """ service: Service def __post_init__(self): # Generate the JWT Bearer token. Ported from gemma-zds-client ClientAuth. payload = { # standard claims "iss": self.service.client_id, "iat": int(time.time()), # custom claims "client_id": self.service.client_id, "user_id": self.service.user_id, "user_representation": self.service.user_representation, } self._token: str = jwt.encode(payload, self.service.secret, algorithm="HS256") def __call__(self, request: PreparedRequest): request.headers["Authorization"] = f"Bearer {self._token}" return request