# Kafka AidboxTopicDestination {% hint style="info" %} This functionality is available in Aidbox versions 2409 and later and requires [FHIR Schema](../../modules/profiling-and-validation/fhir-schema-validator/) validation engine to be [enabled](../../modules/profiling-and-validation/fhir-schema-validator/setup-aidbox-with-fhir-schema-validation-engine.md). {% endhint %} This page describes an AidboxTopicDestination which allows to store events described by an AidboxSubscriptionTopic in Kafka. Aidbox provides two kinds of Kafka integrations: 1. _Best effort_: Aidbox stores events in memory. In some cases (for example, if Aidbox crashes or Kafka is unavailable), events can be lost. 2. _At least once_: Aidbox stores events in the database in the same transaction with a CRUD operation. Aidbox guarantees at least once delivery for an event. `Best effort` incurs a lower performance cost than the `at least once` approach. Choose `at least once` if performance is not a concern for you. {% hint style="warning" %} Be aware of using `Best effort` with batch transactions. Messages are generated while processing batch entries, so if the batch transaction fails at the end, the messages will not be revoked. {% endhint %} {% hint style="warning" %} Please note that `at least once`approach uses **transactional** Kafka producers. Please make sure that `transaction.state.log.replication.factor` is less or equal then the number of brokers in your Kafka cluster. Otherwise sending messages from Aidbox to Kafka may fail with `Timeout expired after ...ms while awaiting InitProducerId`error. {% endhint %} {% content-ref url="./" %} [.](./) {% endcontent-ref %} ## Configuration To use Kafka with [#aidboxsubscriptiontopic](kafka-aidboxtopicdestination.md#aidboxsubscriptiontopic) you have to create [#aidboxtopicdestination](kafka-aidboxtopicdestination.md#aidboxtopicdestination) resource. There are two FHIR profiles available to use with Kafka: for best effort: ``` http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort ``` for _at least once_: ``` http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once ``` ### Available Parameters {% hint style="info" %} For additional details see [Kafka Producer Configs Documentation](https://kafka.apache.org/documentation/#producerconfigs) {% endhint %}
Parameter nameValue typeDescription
kafkaTopic *valueStringThe Kafka topic where the data should be sent.
bootstrapServers *valueStringComma-separated string. A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
compressionTypevalueStringSpecify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd').
batchSizevalueIntegerThis configuration controls the default batch size in bytes.
deliveryTimeoutMsvalueIntegerA maximum time limit for reporting the success or failure of a record sent by a producer, covering delays before sending, waiting for broker acknowledgment, and handling retriable errors.
maxBlockMsvalueIntegerThe configuration controls how long the KafkaProducer's send()method will block.
maxRequestSizevalueIntegerThe maximum size of a request in bytes.
requestTimeoutMsvalueIntegerThe maximum amount of time the client will wait for the response of a request.
sslKeystoreKeyvalueStringPrivate key in the format specified by 'ssl.keystore.type'.
securityProtocolvalueStringProtocol used to communicate with brokers.
saslMechanismvalueStringSASL mechanism used for client connections.
saslJaasConfigvalueStringJAAS login context parameters for SASL connections in the format used by JAAS configuration files.
saslClientCallbackHandlerClassvalueStringThe fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.
\* required parameter. ## Examples ### Standalone Kafka (at least once) Full example see on [Github](https://github.com/Aidbox/examples/tree/main/aidbox-features/aidbox-subscriptions-to-kafka)
POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json

{
  "resourceType": "AidboxTopicDestination",
  "meta": {
    "profile": [
      "http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once"
    ]
  },
  "kind": "kafka-at-least-once",
  "id": "kafka-destination",
  "topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
  "parameter": [
    {
      "name": "kafkaTopic",
      "valueString": "aidbox-forms"
    },
    {
      "name": "bootstrapServers",
      "valueString": "kafka:29092"
    }
  ]
}
### AWS MSK Kafka with IAM (best effort) ``` POST /fhir/AidboxTopicDestination content-type: application/json accept: application/json { "resourceType": "AidboxTopicDestination", "meta": { "profile": [ "http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort" ] }, "kind": "kafka-best-effort", "id": "kafka-destination", "topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic", "parameter": [ { "name": "kafkaTopic", "valueString": "aidbox-forms" }, { "name": "bootstrapServers", "valueString": "<...>.amazonaws.com:9098,<...>.amazonaws.com:9098" }, { "name": "securityProtocol", "valueString": "SASL_SSL" }, { "name": "saslMechanism", "valueString": "AWS_MSK_IAM" }, { "name": "saslJaasConfig", "valueString": "software.amazon.msk.auth.iam.IAMLoginModule required;" }, { "name": "saslClientCallbackHandlerClass", "valueString": "software.amazon.msk.auth.iam.IAMClientCallbackHandler" } ] } ``` ## **Status Introspection** Aidbox provides `$status` operation which provides short status information on the integration status: {% tabs %} {% tab title="Request" %} ```yaml GET /fhir/AidboxTopicDestination//$status content-type: application/json accept: application/json ``` {% endtab %} {% tab title="Response (best-effort)" %} {% code title="200 OK" %} ```json { "resourceType": "Parameters", "parameter": [ { "valueDecimal": 1, "name": "messagesDelivered" }, { "valueDecimal": 0, "name": "messagesInProcess" }, { "valueDecimal": 1, "name": "messagesLost" }, { "valueDateTime": "2024-10-03T08:43:36Z", "name": "startTimestamp" }, { "valueString": "active", "name": "status" }, { "name": "lastErrorDetail", "part": [ { "valueString": "Timeout expired after 10000ms while awaiting InitProducerId", "name": "message" }, { "valueDateTime": "2024-10-03T08:44:09Z", "name": "timestamp" } ] } ] } ``` {% endcode %} {% endtab %} {% tab title="Response (at-least-once)" %} ```json 200 OK { "resourceType": "Parameters", "parameter": [ { "name": "messagesDelivered", "valueDecimal": 1 }, { "name": "messagesDeliveryAttempts", "valueDecimal": 2 }, { "name": "messagesInProcess", "valueDecimal": 0 }, { "name": "messagesQueued", "valueDecimal": 0 }, { "name": "startTimestamp", "valueDateTime": "2024-10-03T08:18:47Z" }, { "name": "status", "valueString": "active" }, { "name": "lastErrorDetail", "part": [ { "valueString": "Timeout expired after 10000ms while awaiting InitProducerId", "name": "message" }, { "valueDateTime": "2024-10-03T08:19:32Z", "name": "timestamp" } ] } ] } ``` {% endtab %} {% endtabs %} Response parameters for `best-effort`:
PropertyTypeDescription
startTimestampvalueDateTimeAidboxTopicDestination start time in UTC.
statusvalueStringAidboxTopicDestination status is always active, which means that AidboxTopicDestination will try to send all received notifications.
messagesDeliveredvalueDecimalTotal number of events that have been successfully delivered.
messagesInProcessvalueDecimalCurrent number of events in the buffer being processed for delivery.
messagesLoststringTotal number of events that failed to be delivered.
lastErrorDetailvalueDateTimeInformation about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters.
lastErrorDetail.messagestringError message of the given error.
lastErrorDetail.timestampvalueDateTimeTimestamp of the given error.
Response parameters for `at-least-once`:
PropertyTypeDescription
messagesDeliveredvalueDecimalTotal number of events that have been successfully delivered.
messagesDeliveryAttemptsvalueDecimalNumber of delivery attempts that failed.
messagesInProcessvalueDecimalCurrent number of events in the buffer being processed for delivery.
messagesQueuedvalueDecimalNumber of events pending in the queue for dispatch to the Kafka driver.
startTimestampvalueDateTimeAidboxTopicDestination start time in UTC.
statusvalueStringAidboxTopicDestination status is always active, which means that AidboxTopicDestination will try to send all received notifications.
lastErrorDetailpartInformation about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters.
lastErrorDetail.messagevalueStringError message of the given error.
lastErrorDetail.timestampvalueDateTimeTimestamp of the given error.