from __future__ import annotations
import socket
import uuid
from typing import TYPE_CHECKING
from urllib.parse import urlparse, urlsplit, urlunsplit
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models.functions import Length
from django.utils.translation import gettext_lazy as _
from privates.fields import PrivateMediaFileField
from simple_certmanager.models import Certificate
from solo.models import SingletonModel
from typing_extensions import Self, deprecated
from zgw_consumers import settings as zgw_settings
from ..constants import APITypes, AuthTypes, NLXDirectories
from .abstract import RestAPIService
if TYPE_CHECKING:
from ..legacy.client import ZGWClient
[docs]
class Service(RestAPIService):
uuid = models.UUIDField(_("UUID"), default=uuid.uuid4)
api_type = models.CharField(_("type"), max_length=20, choices=APITypes.choices)
api_root = models.CharField(_("api root url"), max_length=255, unique=True)
# credentials for the API
client_id = models.CharField(max_length=255, blank=True)
secret = models.CharField(max_length=255, blank=True)
auth_type = models.CharField(
_("authorization type"),
max_length=20,
choices=AuthTypes.choices,
default=AuthTypes.zgw,
)
header_key = models.CharField(_("header key"), max_length=100, blank=True)
header_value = models.CharField(_("header value"), max_length=255, blank=True)
nlx = models.URLField(
_("NLX url"), max_length=1000, blank=True, help_text=_("NLX (outway) address")
)
user_id = models.CharField(
_("user ID"),
max_length=255,
blank=True,
help_text=_(
"User ID to use for the audit trail. Although these external API credentials are typically used by"
"this API itself instead of a user, the user ID is required."
),
)
user_representation = models.CharField(
_("user representation"),
max_length=255,
blank=True,
help_text=_("Human readable representation of the user."),
)
client_certificate = models.ForeignKey(
Certificate,
blank=True,
null=True,
help_text=_("The SSL/TLS certificate of the client"),
on_delete=models.PROTECT,
related_name="service_client",
)
server_certificate = models.ForeignKey(
Certificate,
blank=True,
null=True,
help_text=_("The SSL/TLS certificate of the server"),
on_delete=models.PROTECT,
related_name="service_server",
)
timeout = models.PositiveSmallIntegerField(
_("timeout"),
help_text=_("Timeout (in seconds) for HTTP calls."),
default=10,
)
class Meta:
verbose_name = _("service")
verbose_name_plural = _("services")
def __str__(self):
return f"[{self.get_api_type_display()}] {self.label}"
def save(self, *args, **kwargs):
if not self.api_root.endswith("/"):
self.api_root = f"{self.api_root}/"
if self.nlx and not self.nlx.endswith("/"):
self.nlx = f"{self.nlx}/"
super().save(*args, **kwargs)
def clean(self):
super().clean()
# validate header_key and header_value
if self.header_key and not self.header_value:
raise ValidationError(
{
"header_value": _(
"If header_key is set, header_value must also be set"
)
}
)
if not self.header_key and self.header_value:
raise ValidationError(
{"header_key": _("If header_value is set, header_key must also be set")}
)
[docs]
@deprecated(
"The `build_client` method is deprecated and will be removed in the next major release. "
"Instead, use the new `ape_pie.APIClient` or `zgw_consumers.nlx.NLXClient`.",
category=DeprecationWarning,
stacklevel=2,
)
def build_client(self, **claims):
"""
Build an API client from the service configuration.
"""
from ..legacy.client import ClientAuth, get_client_class
api_root = self.api_root
if self.nlx:
api_root = api_root.replace(self.api_root, self.nlx, 1)
Client = get_client_class()
client = Client(api_root, schema_url=self.oas, schema_file=self.oas_file)
if self.server_certificate:
client.server_certificate_path = (
self.server_certificate.public_certificate.path
)
if self.client_certificate:
client.client_certificate_path = (
self.client_certificate.public_certificate.path
)
if self.client_certificate.private_key:
client.client_private_key_path = (
self.client_certificate.private_key.path
)
if self.auth_type == AuthTypes.zgw:
client.auth = ClientAuth(
client_id=self.client_id,
secret=self.secret,
user_id=self.user_id,
user_representation=self.user_representation,
**claims,
)
elif self.auth_type == AuthTypes.api_key:
client.auth_value = {self.header_key: self.header_value}
return client
[docs]
@classmethod
def get_service(cls, url: str) -> Self | None:
split_url = urlsplit(url)
scheme_and_domain = urlunsplit(split_url[:2] + ("", "", ""))
candidates = (
cls.objects.filter(api_root__startswith=scheme_and_domain)
.annotate(api_root_length=Length("api_root"))
.order_by("-api_root_length")
)
# select the one matching
for candidate in candidates.iterator():
if url.startswith(candidate.api_root):
return candidate
return None
[docs]
@classmethod
@deprecated(
"The `get_client` class method is deprecated and will be removed in the next "
"major release. Instead, use the `get_service` class method combined with "
"zgw_consumers.client.build_client.",
category=DeprecationWarning,
stacklevel=2,
)
def get_client(cls, url: str, **kwargs) -> ZGWClient | None:
service = cls.get_service(url)
if not service:
return None
return service.build_client(**kwargs)
class NLXConfig(SingletonModel):
directory = models.CharField(
_("NLX directory"), max_length=50, choices=NLXDirectories.choices, blank=True
)
outway = models.URLField(
_("NLX outway address"),
blank=True,
help_text=_("Example: http://my-outway.nlx:8080"),
)
certificate = PrivateMediaFileField(
upload_to="zgw-consumers/nlx/",
blank=True,
help_text=_(
"Your organization TLS certificate for the NLX network. This is used to "
"fetch the list of available services from the NLX directory API."
),
)
certificate_key = PrivateMediaFileField(
upload_to="zgw-consumers/nlx/",
help_text=_(
"Your organization TLS private key for the NLX network. This is used to "
"fetch the list of available services from the NLX directory API."
),
blank=True,
)
class Meta:
verbose_name = _("NLX configuration")
@property
def directory_url(self) -> str:
nlx_directory_urls = zgw_settings.get_setting("NLX_DIRECTORY_URLS")
return nlx_directory_urls.get(self.directory, "")
def save(self, *args, **kwargs):
if self.outway and not self.outway.endswith("/"):
self.outway = f"{self.outway}/"
super().save(*args, **kwargs)
def clean(self):
super().clean()
if not self.outway:
return
# try to tcp connect to the port
parsed = urlparse(self.outway)
default_port = 80 if parsed.scheme == "http" else 443
port = parsed.port or default_port
nlx_outway_timeout = zgw_settings.get_setting("NLX_OUTWAY_TIMEOUT")
with socket.socket() as s:
s.settimeout(nlx_outway_timeout)
try:
s.connect((parsed.hostname, port))
except (OSError, ConnectionRefusedError):
raise ValidationError(
_("Connection refused. Please provide a correct address.")
)