Karate Clients - Kafka
Java Clients (Consumer and Producer) to interact with Kafka from karate.
It provides features to consume and produce Kafka Events for one o multiple topics.

-
How to set it up:
-
Define (if applicable) the Kafka clients dependencies in the project POM
-
Define the Kafka clients configuration in the file src/test/resources/config/kafka/kafka-<env>-config.yml
-
-
How to use it in the karate files:
-
Kafka Clients Features and Usage
-
POM Configuration
POM Karate Tools
If the project has been generated using the Karate Tools Archetype the pom will already contain the corresponding configuration. |
Add the karatetools dependency in the karate pom:
<properties>
...
<!-- Karate Tools -->
<karatetools.version>X.X.X</karatetools.version>
</properties>
<dependencies>
...
<!-- Karate Tools -->
<dependency>
<groupId>dev.inditex.karate</groupId>
<artifactId>karatetools-starter</artifactId>
<version>${karatetools.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
POM Client
karatetools-starter already includes the corresponding Kafka dependencies.
If you need to change the dependency version, you can include it in the pom as follows:
<properties>
...
<!-- Karate Clients -->
<!-- Karate Clients - Kafka -->
<kafka-clients.version>X.X.X</kafka-clients.version>
<!-- Karate Clients - Kafka - Avro Generation -->
<apache-avro.version>X.X.X</apache-avro.version>
<kafka-avro-serializer.version>X.X.X</kafka-avro-serializer.version>
</properties>
<dependencies>
...
<!-- Karate Clients -->
<!-- Karate Clients - Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache-avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
</dependencies>
POM Kafka Events
In order to process Avro Events the dependency to the module with the Avro classes of the corresponding application events (to produce or consume) must be included in the pom.xml.
-
POM Properties
<properties> ... <!-- Karate Clients - Kafka - AVRO Objects --> <xxxxxx-event-module.version>X.X.X</xxxxxx-event-module.version> ... </properties>
-
POM Dependencies
<dependencies> ... <!-- Karate Clients - Kafka - AVRO Objects --> <dependency> <groupId>com.mycompany.api</groupId> <artifactId>xxxxxx-event-module</artifactId> <version>${xxxxxx-event-module.version}</version> </dependency> ... </dependencies>
Normally this module is the one marked with Amiga marker-project-schema-avro.amiga
|
Client Configuration
Configuration parameters for the Kafka Clients. It accepts any configuration property that can be set in the Kafka Clients:
-
org.apache.kafka.clients.consumer.KafkaConsumer
-
org.apache.kafka.clients.producer.KafkaProducer
For more information on the available configuration properties, see Kafka Configuration Reference:
-
Producer: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
-
Consumer: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
-
Schema Registry: https://docs.confluent.io/platform/current/schema-registry/connect.html#configuration-options
If the project has been generated using the Karate Tools Archetype the archetype would have prompted for the creation of the configuration files. |
This client can be configured for multi-environment execution with a config file per environment:
\---src
\---test
\---resources
\---config
\---kafka
kafka-config-local.yml
...
kafka-config-pre.yml
KafkaClient config - Kafka Cluster and Schema Registry Servers
-
bootstrap.servers
: Comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example:-
localhost:39092
-
-
schema.registry.url
: Comma-separated list of URLs (protocol://host:port) for Schema Registry instances that can be used to register or look up schemas. For example:
KafkaClient config - Producer
Kafka Producer specific properties - must start with producer. so they are only applied to a Producer client.
|
-
producer.client.id
: An id string to pass to the server when making requests. This value is used to identify the client application. -
producer.auto.register.schemas
: Specify if the Serializer should attempt to register the Schema with Schema Registry. To be used with AVRO Serializer.-
true
: Automatically register the Schema with Schema Registry. This is the value to be set for LOCAL environment -
false
: Do not register the Schema with Schema Registry. This is the valued to be set for REMOTE environments with other mechanisms to register the schema.
-
-
producer.key.subject.name.strategy
: Determines how to construct the subject name under which the key schema is registered with Schema Registry. To be used with AVRO Serializer if a custom strategy is needed. -
producer.value.subject.name.strategy
: Determines how to construct the subject name under which the value schema is registered with Schema Registry. To be used with AVRO Serializer if a custom strategy is needed. -
producer.key.serializer
: Serializer class for key that implements theorg.apache.kafka.common.serialization.Serializer
interface. For example:-
org.apache.kafka.common.serialization.LongSerializer
-
org.apache.kafka.common.serialization.StringSerializer
-
-
producer.value.serializer
: Serializer class for value that implements theorg.apache.kafka.common.serialization.Serializer
interface. For example:-
org.apache.kafka.common.serialization.StringSerializer
-
io.confluent.kafka.serializers.KafkaAvroSerializer
-
KafkaClient config - Consumer
Kafka Consumer specific properties - must start with consumer. so they are only applied to a Consumer client.
|
-
consumer.group.id
: A unique string that identifies the consumer group this consumer belongs to.-
This value will be used as default group for the consumer
-
If not provided, it will use the default value
karate-kafka-default-consumer-group
-
It can be overwritten in specific calls like
consume(final String topic, final String group)
-
-
consumer.key.deserializer
: Deserializer class for key that implements theorg.apache.kafka.common.serialization.Deserializer
interface-
Kafka Consumer Serializer class for key Example Values:
-
org.apache.kafka.common.serialization.LongDeserializer
-
org.apache.kafka.common.serialization.StringDeserializer
-
-
-
consumer.value.deserializer
: Deserializer class for value that implements theorg.apache.kafka.common.serialization.Deserializer
interface-
Kafka Consumer Deserializer class for value Example Values:
-
org.apache.kafka.common.serialization.StringDeserializer
-
io.confluent.kafka.serializers.KafkaAvroDeserializer
-
-
-
consumer.auto.offset.reset
: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted).-
Valid values are ( Anything else will throw exception to the consumer):
-
earliest
: Automatically reset the offset to the earliest offset -
latest
: Automatically reset the offset to the latest offset -
none
: Throw exception to the consumer if no previous offset is found for the consumer’s group
-
-
This value will be used as default value for the consumer.
-
If not provided, it will use the default value
earliest
-
It can be overwritten in specific calls like
consume(final String topic, final String group, final String offset)
-
KafkaClient config - Authentication
To access Secure Kafka clusters, the following authentication properties must be set:
-
security.protocol
: Protocol used to communicate with brokers. Valid values are: SASL_PLAINTEXT, SASL_SSL -
sasl.mechanism
: Mechanism used for client connections. Valid values are: PLAIN, SCRAM-SHA-512 -
sasl.jaas.config
: JAAS login context parameters for SASL connections.-
For example:
-
org.apache.kafka.common.security.plain.PlainLoginModule required username='…' password='…';
-
org.apache.kafka.common.security.scram.ScramLoginModule required username='…' password='…';
-
-
# Karate Tools - Kafka Clients Properties
# bootstrap.servers: Comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers: localhost:39092
# Security Protocol: Protocol used to communicate with brokers. Valid values are: SASL_PLAINTEXT, SASL_SSL
security.protocol: SASL_PLAINTEXT
# SASL mechanism: Mechanism used for client connections. Valid values are: PLAIN, SCRAM-SHA-512
sasl.mechanism: PLAIN
# SASL JAAS Configuration: JAAS login context parameters for SASL connections.
# Example Values:
# org.apache.kafka.common.security.plain.PlainLoginModule required username='...' password='...';
# org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='saslplain' password='saslplain-pwd';"
# schema.registry.url: Comma-separated list of URLs (protocol://host:port) for Schema Registry instances that can be used to register or look up schemas
schema.registry.url: http://localhost:38082
# Schema Registry Basic Auth Credentials Source: Specify how to pick the credentials for the Basic authentication header.
# The supported values are URL, USER_INFO and SASL_INHERIT
basic.auth.credentials.source: USER_INFO
# Schema Registry User Info Config: Specify the user info for the Basic authentication in the form of {username}:{password}.
basic.auth.user.info: "schemaregistry:schemaregistry-pwd"
#
# Kafka Producer properties - must start with "producer."
#
# producer.client.id: An id string to pass to the server when making requests.
producer.client.id: KARATELABSS
# producer.auto.register.schemas: Specify if the Serializer should attempt to register the Schema with Schema Registry
producer.auto.register.schemas: true
# producer.key.subject.name.strategy: Determines how to construct the subject name under which the key schema is registered with Schema Registry.
# producer.key.subject.name.strategy: TO_BE_COMPLETED
# producer.value.subject.name.strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.
# producer.value.subject.name.strategy: TO_BE_COMPLETED
# producer.key.serializer: Serializer class for key that implements the 'org.apache.kafka.common.serialization.Serializer' interface
# producer.key.serializer Example Values:
# org.apache.kafka.common.serialization.LongSerializer
# org.apache.kafka.common.serialization.StringSerializer
producer.key.serializer: org.apache.kafka.common.serialization.StringSerializer
# producer.value.serializer: Serializer class for value that implements the 'org.apache.kafka.common.serialization.Serializer' interface
# producer.value.serializer Example Values:
# org.apache.kafka.common.serialization.StringSerializer
# io.confluent.kafka.serializers.KafkaAvroSerializer
producer.value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
#
# Kafka Consumer properties - must start with "consumer."
#
# A unique string that identifies the consumer group this consumer belongs to
# This value will be used as default group for the consumer
# It can be overwritten in especific calls like consume(final String topic, final String group)
consumer.group.id: KARATETOOLS-local
# Deserializer class for key that implements the 'org.apache.kafka.common.serialization.Deserializer' interface
# Deserializer class for key Example Values:
# org.apache.kafka.common.serialization.LongDeserializer
# org.apache.kafka.common.serialization.StringDeserializer
consumer.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Deserializer class for value that implements the 'org.apache.kafka.common.serialization.Deserializer' interface
# Deserializer class for value Example Values:
# org.apache.kafka.common.serialization.StringDeserializer
# io.confluent.kafka.serializers.KafkaAvroDeserializer
consumer.value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
Client Features and Usage - Producer
Instantiate KafkaProducerClient
New instance of the KafkaProducerClient providing the configuration as a map loaded from a yaml file.
public KafkaProducerClient(final Map<Object, Object> config)
# public KafkaProducerClient(final Map<Object, Object> config)
# Instantiate KafkaProducerClient
Given def config = read('classpath:config/kafka/kafka-config-' + karate.env + '.yml')
Given def KafkaProducerClient = Java.type('dev.inditex.karate.kafka.KafkaProducerClient')
Given def kafkaProducerClient = new KafkaProducerClient(config)
Check if Kafka is available
Checks if the Kafka connection can be established
Returns true is connection is available, false otherwise
public Boolean available()
# public Boolean available()
When def producerAvailable = kafkaProducerClient.available()
Then if (!producerAvailable) karate.fail('Kafka Client not available')
Send a record to a topic
Send a record to a topic, with or without headers.
The record being sent is a java intance of the corresponding business event.
For example:
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def event = new KarateEvent("A1", "karate-A1", 11)
Send a record to a topic without headers
public void send(final String topic, final Object event)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
# public void send(final String topic, final Object event)
When kafkaProducerClient.send(topicA, eventA1)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given string eventA1 = { "id": "A1", "name": "karate-A1", "value": 11 }
# public void send(final String topic, final Object event)
When kafkaProducerClient.send(topicA, eventA1)
Send a record to a topic with Headers
public void send(final String topic, final Object event, final Map<String, List<String>> headers)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def headersA = { "contentType": [ "application/*+avro" ], "status": [ "A" ] }
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
# public void send(final String topic, final Object event, final Map<String, List<String>> headers)
When kafkaProducerClient.send(topicA, eventA2, headersA)
Client Features and Usage - Consumer
Instantiate KafkaConsumerClient
New instance of the KafkaConsumerClient providing the configuration as a map loaded from a yaml file.
public KafkaConsumerClient(final Map<Object, Object> config)
# public KafkaConsumerClient(final Map<Object, Object> config)
# Instantiate KafkaConsumerClient
Given def config = read('classpath:config/kafka/kafka-config-' + karate.env + '.yml')
Given def KafkaConsumerClient = Java.type('dev.inditex.karate.kafka.KafkaConsumerClient')
Given def kafkaConsumerClient = new KafkaConsumerClient(config)
Check if Kafka is available
Checks if the Kafka connection can be established
Returns true is connection is available, false otherwise
public Boolean available()
# public Boolean available()
When def consumerAvailable = kafkaConsumerClient.available()
Then if (!consumerAvailable) karate.fail('Kafka Client not available')
# public Boolean available()
When def consumerAvailable = kafkaConsumerClient.available()
When def producerAvailable = kafkaProducerClient.available()
Then if (!consumerAvailable || !producerAvailable) karate.fail('Kafka Client not available')
Consume events from one topic
Consume events from a specific topic with optionally group, offset and timeout
When timeout is not provided it will use the Client default (5000 ms)
Returns a JSON Array representing the obtained messages, where each row is a map << field name, message value >>
For example:
[
{ "id": "A1", "name": "karate-A1", "value": 11 },
{ "id": "A2", "name": "karate-A2", "value": 12 }
]
Consume events from one topic
public List<Map<String, Object>> consume(final String topic)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
# public List<Map<String, Object>> consume(final String topic)
When def messagesTopicA = kafkaConsumerClient.consume(topicA)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A-String"
Given string eventA1 = { "id": "A1", "name": "karate-A1", "value": 11 }
Given string eventA2 = { "id": "A2", "name": "karate-A2", "value": 12 }
# public List<Map<String, Object>> consume(final String topic)
When def messagesTopicA = kafkaConsumerClient.consume(topicA)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.fromString(eventA1)
Then match messagesTopicA[1] == karate.fromString(eventA2)
Consume events from one topic with timeout
public List<Map<String, Object>> consume(final String topic, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
# public List<Map<String, Object>> consume(final String topic, final long timeout)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, 2000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.toJson(eventA3)
Then match messagesTopicA[1] == karate.toJson(eventA4)
Consume events from one topic with group
public List<Map<String, Object>> consume(final String topic, final String group)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def group1 = "karate-kafka-consumer-group-1"
# public List<Map<String, Object>> consume(final String topic, final String group)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, group1)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)
Consume events from one topic with group and offset
public List<Map<String, Object>> consume(final String topic, final String group, final String offset)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def group2 = "karate-kafka-consumer-group-2"
# public List<Map<String, Object>> consume(final String topic, final String group, final String offset)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, group2, 'earliest')
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)
Consume events from one topic with group and timeout
public List<Map<String, Object>> consume(final String topic, final String group, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def group3 = "karate-kafka-consumer-group-3"
# public List<Map<String, Object>> consume(final String topic, final String group, final long timeout)
When def messagesTopicB = kafkaConsumerClient.consume(topicA, group3, 3000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)
Consume events from one topic with timeout, group and offset
public List<Map<String, Object>> consume(final String topic, final String group, final String offset, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def group3 = "karate-kafka-consumer-group-4"
# public List<Map<String, Object>> consume(final String topic, final String group, final String offset, final long timeout)
When def messagesTopicB = kafkaConsumerClient.consume(topicA, group3, 'latest', 3000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)
Consume events from multiple topics
Consume events from multiple topics with optionally group, offset and timeout
When timeout is not provided it will use the Client default (5000 ms)
Returns a JSON Map where the keys are the topics and the values are JSON Arrays representing the obtained events, where each row is a map << field name, message value >>
For example:
{
"dev.inditex.karate.kafka.public.A": [
{ "id": "A1", "name": "karate-A1", "value": 11 },
{ "id": "A2", "name": "karate-A2", "value": 12 }
],
"dev.inditex.karate.kafka.public.B": [
{ "id": "B1", "name": "karate-B1", "value": 21 },
{ "id": "B2", "name": "karate-B2", "value": 22 }
]
}
Consume events from multiple topics
public Map<String, List<Map<String, Object>>> consume(final String[] topics)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)
Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)
# public Map<String, List<Map<String, Object>>> consume(final String[] topics)
When def messagesTopics = kafkaConsumerClient.consume(topics)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[2]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[2]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Consume events from multiple topics with timeout
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)
Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)
# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, 2000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[2]'
Then match messagesTopicC[0] == karate.toJson(eventC3)
Then match messagesTopicC[1] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[2]'
Then match messagesTopicD[0] == karate.toJson(eventD3)
Then match messagesTopicD[1] == karate.toJson(eventD4)
Consume events from multiple topics with group
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)
Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)
Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)
Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)
Given def group1 = "karate-kafka-consumer-group-1"
# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group)
When def messagesTopics = kafkaConsumerClient.consume(topics, group1)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)
Consume events from multiple topics with group and offset
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)
Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)
Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)
Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)
Given def group2 = "karate-kafka-consumer-group-2"
# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset)
When def messagesTopics = kafkaConsumerClient.consume(topics, group2, 'earliest')
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)
Consume events from multiple topics with group and timeout
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def eventB1 = new KarateEvent("B1", "karate-B1", 21)
Given def eventB2 = new KarateEvent("B2", "karate-B2", 22)
Given def eventB3 = new KarateEvent("B3", "karate-B3", 23)
Given def eventB4 = new KarateEvent("B4", "karate-B4", 24)
Given def group3 = "karate-kafka-consumer-group-3"
# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, group3, 3000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)
Consume events from multiple topics with timeout, group and offset
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset, final long timeout)
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)
Given def eventB1 = new KarateEvent("B1", "karate-B1", 21)
Given def eventB2 = new KarateEvent("B2", "karate-B2", 22)
Given def eventB3 = new KarateEvent("B3", "karate-B3", 23)
Given def eventB4 = new KarateEvent("B4", "karate-B4", 24)
Given def group3 = "karate-kafka-consumer-group-4"
# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, group3, 'latest', 3000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)