Karate Clients - Kafka

Java Clients (Consumer and Producer) to interact with Kafka from karate.

It provides features to consume and produce Kafka Events for one o multiple topics.

kafka

POM Configuration

POM Karate Tools

If the project has been generated using the Karate Tools Archetype the pom will already contain the corresponding configuration.
Add the karatetools dependency in the karate pom:
<properties>
  ...
  <!-- Karate Tools -->
  <karatetools.version>X.X.X</karatetools.version>
</properties>

<dependencies>
  ...
  <!-- Karate Tools -->
  <dependency>
    <groupId>dev.inditex.karate</groupId>
    <artifactId>karatetools-starter</artifactId>
    <version>${karatetools.version}</version>
    <scope>test</scope>
  </dependency>
</dependencies>

POM Client

karatetools-starter already includes the corresponding Kafka dependencies.

If you need to change the dependency version, you can include it in the pom as follows:
  <properties>
    ...
    <!-- Karate Clients -->
    <!-- Karate Clients - Kafka  -->
    <kafka-clients.version>X.X.X</kafka-clients.version>
    <!-- Karate Clients - Kafka - Avro Generation -->
    <apache-avro.version>X.X.X</apache-avro.version>
    <kafka-avro-serializer.version>X.X.X</kafka-avro-serializer.version>
  </properties>

  <dependencies>
    ...
    <!-- Karate Clients -->
    <!-- Karate Clients - Kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka-clients.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${apache-avro.version}</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>${kafka-avro-serializer.version}</version>
    </dependency>
  </dependencies>

POM Kafka Events

In order to process Avro Events the dependency to the module with the Avro classes of the corresponding application events (to produce or consume) must be included in the pom.xml.

  • POM Properties

      <properties>
        ...
        <!-- Karate Clients - Kafka - AVRO Objects -->
        <xxxxxx-event-module.version>X.X.X</xxxxxx-event-module.version>
        ...
      </properties>
  • POM Dependencies

      <dependencies>
        ...
        <!-- Karate Clients - Kafka - AVRO Objects -->
        <dependency>
          <groupId>com.mycompany.api</groupId>
          <artifactId>xxxxxx-event-module</artifactId>
          <version>${xxxxxx-event-module.version}</version>
        </dependency>
        ...
      </dependencies>
Normally this module is the one marked with Amiga marker-project-schema-avro.amiga

Client Configuration

Configuration parameters for the Kafka Clients. It accepts any configuration property that can be set in the Kafka Clients:

  • org.apache.kafka.clients.consumer.KafkaConsumer

  • org.apache.kafka.clients.producer.KafkaProducer

For more information on the available configuration properties, see Kafka Configuration Reference:

If the project has been generated using the Karate Tools Archetype the archetype would have prompted for the creation of the configuration files.

This client can be configured for multi-environment execution with a config file per environment:

\---src
    \---test
        \---resources
            \---config
                \---kafka
                    kafka-config-local.yml
                    ...
                    kafka-config-pre.yml

KafkaClient config - Kafka Cluster and Schema Registry Servers

  • bootstrap.servers: Comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example:

    • localhost:39092

  • schema.registry.url: Comma-separated list of URLs (protocol://host:port) for Schema Registry instances that can be used to register or look up schemas. For example:

KafkaClient config - Producer

Kafka Producer specific properties - must start with producer. so they are only applied to a Producer client.
  • producer.client.id: An id string to pass to the server when making requests. This value is used to identify the client application.

  • producer.auto.register.schemas: Specify if the Serializer should attempt to register the Schema with Schema Registry. To be used with AVRO Serializer.

    • true: Automatically register the Schema with Schema Registry. This is the value to be set for LOCAL environment

    • false: Do not register the Schema with Schema Registry. This is the valued to be set for REMOTE environments with other mechanisms to register the schema.

  • producer.key.subject.name.strategy: Determines how to construct the subject name under which the key schema is registered with Schema Registry. To be used with AVRO Serializer if a custom strategy is needed.

  • producer.value.subject.name.strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry. To be used with AVRO Serializer if a custom strategy is needed.

  • producer.key.serializer: Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. For example:

    • org.apache.kafka.common.serialization.LongSerializer

    • org.apache.kafka.common.serialization.StringSerializer

  • producer.value.serializer: Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. For example:

    • org.apache.kafka.common.serialization.StringSerializer

    • io.confluent.kafka.serializers.KafkaAvroSerializer

KafkaClient config - Consumer

Kafka Consumer specific properties - must start with consumer. so they are only applied to a Consumer client.
  • consumer.group.id: A unique string that identifies the consumer group this consumer belongs to.

    • This value will be used as default group for the consumer

    • If not provided, it will use the default value karate-kafka-default-consumer-group

    • It can be overwritten in specific calls like consume(final String topic, final String group)

  • consumer.key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface

    • Kafka Consumer Serializer class for key Example Values:

      • org.apache.kafka.common.serialization.LongDeserializer

      • org.apache.kafka.common.serialization.StringDeserializer

  • consumer.value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface

    • Kafka Consumer Deserializer class for value Example Values:

      • org.apache.kafka.common.serialization.StringDeserializer

      • io.confluent.kafka.serializers.KafkaAvroDeserializer

  • consumer.auto.offset.reset: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted).

    • Valid values are ( Anything else will throw exception to the consumer):

      • earliest: Automatically reset the offset to the earliest offset

      • latest: Automatically reset the offset to the latest offset

      • none: Throw exception to the consumer if no previous offset is found for the consumer’s group

    • This value will be used as default value for the consumer.

    • If not provided, it will use the default value earliest

    • It can be overwritten in specific calls like consume(final String topic, final String group, final String offset)

KafkaClient config - Authentication

To access Secure Kafka clusters, the following authentication properties must be set:

  • security.protocol: Protocol used to communicate with brokers. Valid values are: SASL_PLAINTEXT, SASL_SSL

  • sasl.mechanism: Mechanism used for client connections. Valid values are: PLAIN, SCRAM-SHA-512

  • sasl.jaas.config: JAAS login context parameters for SASL connections.

    • For example:

      • org.apache.kafka.common.security.plain.PlainLoginModule required username='…​' password='…​';

      • org.apache.kafka.common.security.scram.ScramLoginModule required username='…​' password='…​';

Example
# Karate Tools - Kafka Clients Properties

# bootstrap.servers: Comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers: localhost:39092

# Security Protocol: Protocol used to communicate with brokers. Valid values are: SASL_PLAINTEXT, SASL_SSL
security.protocol: SASL_PLAINTEXT

# SASL mechanism: Mechanism used for client connections. Valid values are: PLAIN, SCRAM-SHA-512
sasl.mechanism: PLAIN

# SASL JAAS Configuration: JAAS login context parameters for SASL connections.
# Example Values:
#   org.apache.kafka.common.security.plain.PlainLoginModule required username='...' password='...';
#   org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='saslplain' password='saslplain-pwd';"

# schema.registry.url: Comma-separated list of URLs (protocol://host:port) for Schema Registry instances that can be used to register or look up schemas
schema.registry.url: http://localhost:38082

# Schema Registry Basic Auth Credentials Source: Specify how to pick the credentials for the Basic authentication header.
# The supported values are URL, USER_INFO and SASL_INHERIT
basic.auth.credentials.source: USER_INFO
# Schema Registry User Info Config: Specify the user info for the Basic authentication in the form of {username}:{password}.
basic.auth.user.info: "schemaregistry:schemaregistry-pwd"

#
# Kafka Producer properties - must start with "producer."
#
# producer.client.id: An id string to pass to the server when making requests.
producer.client.id: KARATELABSS

# producer.auto.register.schemas: Specify if the Serializer should attempt to register the Schema with Schema Registry
producer.auto.register.schemas: true

# producer.key.subject.name.strategy: Determines how to construct the subject name under which the key schema is registered with Schema Registry.
# producer.key.subject.name.strategy: TO_BE_COMPLETED

# producer.value.subject.name.strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.
# producer.value.subject.name.strategy: TO_BE_COMPLETED

# producer.key.serializer: Serializer class for key that implements the 'org.apache.kafka.common.serialization.Serializer' interface
# producer.key.serializer Example Values:
#   org.apache.kafka.common.serialization.LongSerializer
#   org.apache.kafka.common.serialization.StringSerializer
producer.key.serializer: org.apache.kafka.common.serialization.StringSerializer

# producer.value.serializer: Serializer class for value that implements the 'org.apache.kafka.common.serialization.Serializer' interface
# producer.value.serializer Example Values:
#   org.apache.kafka.common.serialization.StringSerializer
#   io.confluent.kafka.serializers.KafkaAvroSerializer
producer.value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

#
# Kafka Consumer properties - must start with "consumer."
#

# A unique string that identifies the consumer group this consumer belongs to
# This value will be used as default group for the consumer
# It can be overwritten in especific calls like consume(final String topic, final String group)
consumer.group.id: KARATETOOLS-local

# Deserializer class for key that implements the 'org.apache.kafka.common.serialization.Deserializer' interface
# Deserializer class for key Example Values:
#   org.apache.kafka.common.serialization.LongDeserializer
#   org.apache.kafka.common.serialization.StringDeserializer
consumer.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer

# Deserializer class for value that implements the 'org.apache.kafka.common.serialization.Deserializer' interface
# Deserializer class for value Example Values:
#   org.apache.kafka.common.serialization.StringDeserializer
#   io.confluent.kafka.serializers.KafkaAvroDeserializer
consumer.value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

Client Features and Usage - Producer

Instantiate KafkaProducerClient

New instance of the KafkaProducerClient providing the configuration as a map loaded from a yaml file.

Java Signature
public KafkaProducerClient(final Map<Object, Object> config)
Gherkin Usage
# public KafkaProducerClient(final Map<Object, Object> config)
# Instantiate KafkaProducerClient
Given def config = read('classpath:config/kafka/kafka-config-' + karate.env + '.yml')
Given def KafkaProducerClient = Java.type('dev.inditex.karate.kafka.KafkaProducerClient')
Given def kafkaProducerClient = new KafkaProducerClient(config)

Check if Kafka is available

Checks if the Kafka connection can be established

Returns true is connection is available, false otherwise

Java Signature
public Boolean available()
Gherkin Usage
# public Boolean available()
When def producerAvailable = kafkaProducerClient.available()
Then if (!producerAvailable) karate.fail('Kafka Client not available')

Send a record to a topic

Send a record to a topic, with or without headers.

The record being sent is a java intance of the corresponding business event.

For example:

Java Event
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def event = new KarateEvent("A1", "karate-A1", 11)

Send a record to a topic without headers

Java Signature
public void send(final String topic, final Object event)
Gherkin Usage - AVRO Serializer/Deserializer
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)

# public void send(final String topic, final Object event)
When kafkaProducerClient.send(topicA, eventA1)
Gherkin Usage - String Serializer/Deserializer
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given string eventA1 = { "id": "A1", "name": "karate-A1", "value": 11 }

# public void send(final String topic, final Object event)
When kafkaProducerClient.send(topicA, eventA1)

Send a record to a topic with Headers

Java Signature
public void send(final String topic, final Object event, final Map<String, List<String>> headers)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"
Given def headersA = { "contentType": [ "application/*+avro" ], "status": [ "A" ] }
Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)

# public void send(final String topic, final Object event, final Map<String, List<String>> headers)
When kafkaProducerClient.send(topicA, eventA2, headersA)

Client Features and Usage - Consumer

Instantiate KafkaConsumerClient

New instance of the KafkaConsumerClient providing the configuration as a map loaded from a yaml file.

Java Signature
public KafkaConsumerClient(final Map<Object, Object> config)
Gherkin Usage
# public KafkaConsumerClient(final Map<Object, Object> config)
# Instantiate KafkaConsumerClient
Given def config = read('classpath:config/kafka/kafka-config-' + karate.env + '.yml')
Given def KafkaConsumerClient = Java.type('dev.inditex.karate.kafka.KafkaConsumerClient')
Given def kafkaConsumerClient = new KafkaConsumerClient(config)

Check if Kafka is available

Checks if the Kafka connection can be established

Returns true is connection is available, false otherwise

Java Signature
public Boolean available()
Gherkin Usage
# public Boolean available()
When def consumerAvailable = kafkaConsumerClient.available()
Then if (!consumerAvailable) karate.fail('Kafka Client not available')
Gherkin Usage - Both clients availability check
# public Boolean available()
When def consumerAvailable = kafkaConsumerClient.available()
When def producerAvailable = kafkaProducerClient.available()
Then if (!consumerAvailable || !producerAvailable) karate.fail('Kafka Client not available')

Consume events from one topic

Consume events from a specific topic with optionally group, offset and timeout

When timeout is not provided it will use the Client default (5000 ms)

Returns a JSON Array representing the obtained messages, where each row is a map << field name, message value >>

For example:

[
  { "id": "A1", "name": "karate-A1", "value": 11 },
  { "id": "A2", "name": "karate-A2", "value": 12 }
]

Consume events from one topic

Java Signature
public List<Map<String, Object>> consume(final String topic)
Gherkin Usage - AVRO Serializer/Deserializer
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)

# public List<Map<String, Object>> consume(final String topic)
When def messagesTopicA = kafkaConsumerClient.consume(topicA)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Gherkin Usage - String Serializer/Deserializer
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A-String"

Given string eventA1 = { "id": "A1", "name": "karate-A1", "value": 11 }
Given string eventA2 = { "id": "A2", "name": "karate-A2", "value": 12 }

# public List<Map<String, Object>> consume(final String topic)
When def messagesTopicA = kafkaConsumerClient.consume(topicA)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.fromString(eventA1)
Then match messagesTopicA[1] == karate.fromString(eventA2)

Consume events from one topic with timeout

Java Signature
public List<Map<String, Object>> consume(final String topic, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

# public List<Map<String, Object>> consume(final String topic, final long timeout)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, 2000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[2]'
Then match messagesTopicA[0] == karate.toJson(eventA3)
Then match messagesTopicA[1] == karate.toJson(eventA4)

Consume events from one topic with group

Java Signature
public List<Map<String, Object>> consume(final String topic, final String group)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def group1 = "karate-kafka-consumer-group-1"

# public List<Map<String, Object>> consume(final String topic, final String group)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, group1)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)

Consume events from one topic with group and offset

Java Signature
public List<Map<String, Object>> consume(final String topic, final String group, final String offset)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def group2 = "karate-kafka-consumer-group-2"

# public List<Map<String, Object>> consume(final String topic, final String group, final String offset)
When def messagesTopicA = kafkaConsumerClient.consume(topicA, group2, 'earliest')
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)

Consume events from one topic with group and timeout

Java Signature
public List<Map<String, Object>> consume(final String topic, final String group, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def group3 = "karate-kafka-consumer-group-3"

# public List<Map<String, Object>> consume(final String topic, final String group, final long timeout)
When def messagesTopicB = kafkaConsumerClient.consume(topicA, group3, 3000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)

Consume events from one topic with timeout, group and offset

Java Signature
public List<Map<String, Object>> consume(final String topic, final String group, final String offset, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicA = "dev.inditex.karate.kafka.public.A"

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def group3 = "karate-kafka-consumer-group-4"

# public List<Map<String, Object>> consume(final String topic, final String group, final String offset, final long timeout)
When def messagesTopicB = kafkaConsumerClient.consume(topicA, group3, 'latest', 3000)
Then karate.log('messagesTopicA=', messagesTopicA)
Then match messagesTopicA == '#[4]'
Then match messagesTopicA[0] == karate.toJson(eventA1)
Then match messagesTopicA[1] == karate.toJson(eventA2)
Then match messagesTopicA[2] == karate.toJson(eventA3)
Then match messagesTopicA[3] == karate.toJson(eventA4)

Consume events from multiple topics

Consume events from multiple topics with optionally group, offset and timeout

When timeout is not provided it will use the Client default (5000 ms)

Returns a JSON Map where the keys are the topics and the values are JSON Arrays representing the obtained events, where each row is a map << field name, message value >>

For example:

{
  "dev.inditex.karate.kafka.public.A": [
    { "id": "A1", "name": "karate-A1", "value": 11 },
    { "id": "A2", "name": "karate-A2", "value": 12 }
  ],
  "dev.inditex.karate.kafka.public.B": [
    { "id": "B1", "name": "karate-B1", "value": 21 },
    { "id": "B2", "name": "karate-B2", "value": 22 }
  ]
}

Consume events from multiple topics

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)

Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)

# public Map<String, List<Map<String, Object>>> consume(final String[] topics)
When def messagesTopics = kafkaConsumerClient.consume(topics)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[2]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[2]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)

Consume events from multiple topics with timeout

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)

Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)

# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, 2000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[2]'
Then match messagesTopicC[0] == karate.toJson(eventC3)
Then match messagesTopicC[1] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[2]'
Then match messagesTopicD[0] == karate.toJson(eventD3)
Then match messagesTopicD[1] == karate.toJson(eventD4)

Consume events from multiple topics with group

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)
Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)

Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)
Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)

Given def group1 = "karate-kafka-consumer-group-1"

# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group)
When def messagesTopics = kafkaConsumerClient.consume(topics, group1)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)

Consume events from multiple topics with group and offset

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventC1 = new KarateEvent("C1", "karate-C1", 31)
Given def eventC2 = new KarateEvent("C2", "karate-C2", 32)
Given def eventC3 = new KarateEvent("C3", "karate-C3", 33)
Given def eventC4 = new KarateEvent("C4", "karate-C4", 34)

Given def eventD1 = new KarateEvent("D1", "karate-D1", 41)
Given def eventD2 = new KarateEvent("D2", "karate-D2", 42)
Given def eventD3 = new KarateEvent("D3", "karate-D3", 43)
Given def eventD4 = new KarateEvent("D4", "karate-D4", 44)

Given def group2 = "karate-kafka-consumer-group-2"

# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset)
When def messagesTopics = kafkaConsumerClient.consume(topics, group2, 'earliest')
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)

Consume events from multiple topics with group and timeout

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def eventB1 = new KarateEvent("B1", "karate-B1", 21)
Given def eventB2 = new KarateEvent("B2", "karate-B2", 22)
Given def eventB3 = new KarateEvent("B3", "karate-B3", 23)
Given def eventB4 = new KarateEvent("B4", "karate-B4", 24)

Given def group3 = "karate-kafka-consumer-group-3"

# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, group3, 3000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)

Consume events from multiple topics with timeout, group and offset

Java Signature
public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset, final long timeout)
Gherkin Usage
# Define Consumer Topics, Headers, Events and Consumer Groups
Given def topicC = "dev.inditex.karate.kafka.public.C"
Given def topicD = "dev.inditex.karate.kafka.public.D"
Given def topics = [ "#(topicC)", "#(topicD)" ]

Given def KarateEvent = Java.type('dev.inditex.karate.kafka.KarateEvent')

Given def eventA1 = new KarateEvent("A1", "karate-A1", 11)
Given def eventA2 = new KarateEvent("A2", "karate-A2", 12)
Given def eventA3 = new KarateEvent("A3", "karate-A3", 13)
Given def eventA4 = new KarateEvent("A4", "karate-A4", 14)

Given def eventB1 = new KarateEvent("B1", "karate-B1", 21)
Given def eventB2 = new KarateEvent("B2", "karate-B2", 22)
Given def eventB3 = new KarateEvent("B3", "karate-B3", 23)
Given def eventB4 = new KarateEvent("B4", "karate-B4", 24)

Given def group3 = "karate-kafka-consumer-group-4"

# public Map<String, List<Map<String, Object>>> consume(final String[] topics, final String group, final String offset, final long timeout)
When def messagesTopics = kafkaConsumerClient.consume(topics, group3, 'latest', 3000)
Then karate.log('messagesTopics=', messagesTopics)
Then def messagesTopicC = karate.jsonPath(messagesTopics, "$['" + topicC + "']")
Then karate.log('messagesTopicC=', messagesTopicC)
Then match messagesTopicC == '#[4]'
Then match messagesTopicC[0] == karate.toJson(eventC1)
Then match messagesTopicC[1] == karate.toJson(eventC2)
Then match messagesTopicC[2] == karate.toJson(eventC3)
Then match messagesTopicC[3] == karate.toJson(eventC4)
Then def messagesTopicD = karate.jsonPath(messagesTopics, "$['" + topicD + "']")
Then karate.log('messagesTopicD=', messagesTopicD)
Then match messagesTopicD == '#[4]'
Then match messagesTopicD[0] == karate.toJson(eventD1)
Then match messagesTopicD[1] == karate.toJson(eventD2)
Then match messagesTopicD[2] == karate.toJson(eventD3)
Then match messagesTopicD[3] == karate.toJson(eventD4)