# Kafka AidboxTopicDestination
{% hint style="info" %}
This functionality is available in Aidbox versions 2409 and later and requires [FHIR Schema](../../modules/profiling-and-validation/fhir-schema-validator/) validation engine to be [enabled](../../modules/profiling-and-validation/fhir-schema-validator/setup-aidbox-with-fhir-schema-validation-engine.md).
{% endhint %}
This page describes an AidboxTopicDestination which allows to store events described by an AidboxSubscriptionTopic in Kafka.
Aidbox provides two kinds of Kafka integrations:
1. _Best effort_: Aidbox stores events in memory. In some cases (for example, if Aidbox crashes or Kafka is unavailable), events can be lost.
2. _At least once_: Aidbox stores events in the database in the same transaction with a CRUD operation. Aidbox guarantees at least once delivery for an event.
`Best effort` incurs a lower performance cost than the `at least once` approach. Choose `at least once` if performance is not a concern for you.
{% hint style="warning" %}
Be aware of using `Best effort` with batch transactions. Messages are generated while processing batch entries, so if the batch transaction fails at the end, the messages will not be revoked.
{% endhint %}
{% hint style="warning" %}
Please note that `at least once`approach uses **transactional** Kafka producers. Please make sure that `transaction.state.log.replication.factor` is less or equal then the number of brokers in your Kafka cluster. Otherwise sending messages from Aidbox to Kafka may fail with `Timeout expired after ...ms while awaiting InitProducerId`error.
{% endhint %}
{% content-ref url="./" %}
[.](./)
{% endcontent-ref %}
## Configuration
To use Kafka with [#aidboxsubscriptiontopic](kafka-aidboxtopicdestination.md#aidboxsubscriptiontopic) you have to create [#aidboxtopicdestination](kafka-aidboxtopicdestination.md#aidboxtopicdestination) resource.
There are two FHIR profiles available to use with Kafka:
for best effort:
```
http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort
```
for _at least once_:
```
http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once
```
### Available Parameters
{% hint style="info" %}
For additional details see [Kafka Producer Configs Documentation](https://kafka.apache.org/documentation/#producerconfigs)
{% endhint %}
| Parameter name | Value type | Description |
|---|
kafkaTopic * | valueString | The Kafka topic where the data should be sent. |
bootstrapServers * | valueString | Comma-separated string. A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
compressionType | valueString | Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). |
batchSize | valueInteger | This configuration controls the default batch size in bytes. |
deliveryTimeoutMs | valueInteger | A maximum time limit for reporting the success or failure of a record sent by a producer, covering delays before sending, waiting for broker acknowledgment, and handling retriable errors. |
maxBlockMs | valueInteger | The configuration controls how long the KafkaProducer's send()method will block. |
maxRequestSize | valueInteger | The maximum size of a request in bytes. |
requestTimeoutMs | valueInteger | The maximum amount of time the client will wait for the response of a request. |
sslKeystoreKey | valueString | Private key in the format specified by 'ssl.keystore.type'. |
securityProtocol | valueString | Protocol used to communicate with brokers. |
saslMechanism | valueString | SASL mechanism used for client connections. |
saslJaasConfig | valueString | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. |
saslClientCallbackHandlerClass | valueString | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. |
\* required parameter.
## Examples
### Standalone Kafka (at least once)
Full example see on [Github](https://github.com/Aidbox/examples/tree/main/aidbox-features/aidbox-subscriptions-to-kafka)
POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json
{
"resourceType": "AidboxTopicDestination",
"meta": {
"profile": [
"http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once"
]
},
"kind": "kafka-at-least-once",
"id": "kafka-destination",
"topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
"parameter": [
{
"name": "kafkaTopic",
"valueString": "aidbox-forms"
},
{
"name": "bootstrapServers",
"valueString": "kafka:29092"
}
]
}
### AWS MSK Kafka with IAM (best effort)
```
POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json
{
"resourceType": "AidboxTopicDestination",
"meta": {
"profile": [
"http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort"
]
},
"kind": "kafka-best-effort",
"id": "kafka-destination",
"topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
"parameter": [
{
"name": "kafkaTopic",
"valueString": "aidbox-forms"
},
{
"name": "bootstrapServers",
"valueString": "<...>.amazonaws.com:9098,<...>.amazonaws.com:9098"
},
{
"name": "securityProtocol",
"valueString": "SASL_SSL"
},
{
"name": "saslMechanism",
"valueString": "AWS_MSK_IAM"
},
{
"name": "saslJaasConfig",
"valueString": "software.amazon.msk.auth.iam.IAMLoginModule required;"
},
{
"name": "saslClientCallbackHandlerClass",
"valueString": "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
]
}
```
## **Status Introspection**
Aidbox provides `$status` operation which provides short status information on the integration status:
{% tabs %}
{% tab title="Request" %}
```yaml
GET /fhir/AidboxTopicDestination//$status
content-type: application/json
accept: application/json
```
{% endtab %}
{% tab title="Response (best-effort)" %}
{% code title="200 OK" %}
```json
{
"resourceType": "Parameters",
"parameter": [
{
"valueDecimal": 1,
"name": "messagesDelivered"
},
{
"valueDecimal": 0,
"name": "messagesInProcess"
},
{
"valueDecimal": 1,
"name": "messagesLost"
},
{
"valueDateTime": "2024-10-03T08:43:36Z",
"name": "startTimestamp"
},
{
"valueString": "active",
"name": "status"
},
{
"name": "lastErrorDetail",
"part": [
{
"valueString": "Timeout expired after 10000ms while awaiting InitProducerId",
"name": "message"
},
{
"valueDateTime": "2024-10-03T08:44:09Z",
"name": "timestamp"
}
]
}
]
}
```
{% endcode %}
{% endtab %}
{% tab title="Response (at-least-once)" %}
```json
200 OK
{
"resourceType": "Parameters",
"parameter": [
{
"name": "messagesDelivered",
"valueDecimal": 1
},
{
"name": "messagesDeliveryAttempts",
"valueDecimal": 2
},
{
"name": "messagesInProcess",
"valueDecimal": 0
},
{
"name": "messagesQueued",
"valueDecimal": 0
},
{
"name": "startTimestamp",
"valueDateTime": "2024-10-03T08:18:47Z"
},
{
"name": "status",
"valueString": "active"
},
{
"name": "lastErrorDetail",
"part": [
{
"valueString": "Timeout expired after 10000ms while awaiting InitProducerId",
"name": "message"
},
{
"valueDateTime": "2024-10-03T08:19:32Z",
"name": "timestamp"
}
]
}
]
}
```
{% endtab %}
{% endtabs %}
Response parameters for `best-effort`:
| Property | Type | Description |
|---|
startTimestamp | valueDateTime | AidboxTopicDestination start time in UTC. |
status | valueString | AidboxTopicDestination status is always active, which means that AidboxTopicDestination will try to send all received notifications. |
messagesDelivered | valueDecimal | Total number of events that have been successfully delivered. |
messagesInProcess | valueDecimal | Current number of events in the buffer being processed for delivery. |
messagesLost | string | Total number of events that failed to be delivered. |
lastErrorDetail | valueDateTime | Information about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters. |
lastErrorDetail.message | string | Error message of the given error. |
lastErrorDetail.timestamp | valueDateTime | Timestamp of the given error. |
Response parameters for `at-least-once`:
| Property | Type | Description |
|---|
messagesDelivered | valueDecimal | Total number of events that have been successfully delivered. |
messagesDeliveryAttempts | valueDecimal | Number of delivery attempts that failed. |
messagesInProcess | valueDecimal | Current number of events in the buffer being processed for delivery. |
messagesQueued | valueDecimal | Number of events pending in the queue for dispatch to the Kafka driver. |
startTimestamp | valueDateTime | AidboxTopicDestination start time in UTC. |
status | valueString | AidboxTopicDestination status is always active, which means that AidboxTopicDestination will try to send all received notifications. |
lastErrorDetail | part | Information about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters. |
lastErrorDetail.message | valueString | Error message of the given error. |
lastErrorDetail.timestamp | valueDateTime | Timestamp of the given error. |