Kafka Extension

The Kafka extension enables reading from and writing to Kafka topics in Grainite applications.

Setup

In order to be able to use the Kafka extension, first include it as a dependency in your application's pom.xml file.

...
<dependency>
  <groupId>ext.grainite</groupId>
  <artifactId>grainite-kafka</artifactId>
  <version>{GRAINITE-VERSION}</version>
</dependency>
...

Replace {GRAINITE-VERSION} with the version of Grainite you are also using for libgrainite (the Grainite Client library for Java).

Contents

The Kafka Extension includes:

  • KafkaReaderTask: Task to continuously read from a given Kafka topic. The task reads the Kafka topic in parallel for each partition in the source topic.

  • KafkaWriterHandler: Handler that will write it’s input payload to a given (or multiple) Kafka topics

KafkaReaderTask

Note: Only Kafka topics that contain JSON / String payloads are currently supported.

Usage

To include this task in your application, you must specify the taskClass ext.grainite.tasks.kafkareader.KafkaReaderTask and taskInstanceClass ext.grainite.tasks.kafkareader.KafkaReaderTaskInstance in your application's configuration YAML file.

app.yaml
...
tasks:
  - task_name: my_jdbc_reader_task
    taskClass: ext.grainite.tasks.kafkareader.KafkaReaderTask
    taskInstanceClass: ext.grainite.tasks.kafkareader.KafkaReaderTaskInstance
    config:
...

Below are the configuration options that can be passed in under config:

PropertyRequired?ValueDescription

bootstrap.servers

REQUIRED

Example: localhost:9092

Kafka bootstrap server setting (can be found in server.properties file for Kafka

topic

REQUIRED

Example: my_kafka_topic

Name of Kafka topic to read from

key.deserializer

REQUIRED

For String: org.apache.kafka.common.serialization.StringDeserializer

Uses the given deserializer provided by Apache Kafka to deserialize the key from the payload.

value.deserializer

REQUIRED

For String: org.apache.kafka.common.serialization.StringDeserializer

For Avro: io.confluent.kafka.serializers.KafkaAvroDeserializer

For Protobuf: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

Uses the given deserializer provided by Apache Kafka to deserialize the value from the payload.

schema.registry.url

Optional

Example: http://localhost:8081

URL of schema registry to use look up schema

output.topic

Optional

Example: my_events

Grainite topic to emit output of this task to

output.table

Optional

Example: my_table

Grainite table to emit output of this task to via Grain message (output.action must also be provided)

output.table_action

Optional

Example: my_table_action

Given action to invoke via Grain Message to the table specified in output.table

KafkaWriterHandler

Note: Only Grainite topics that contain JSON payloads are currently supported.

Usage

To include this handler in your application, you must specify the class_name ext.grainite.handlers.KafkaWriterHandler in your application's configuration YAML file.

app.yaml
...
tables:
  - table_name: my_kafka_writer
    key_type: string
    action_handlers:
      - name: my_kafka_writer
        type: java
        class_name: ext.grainite.handlers.KafkaWriterHandler
        config:
...

Below are the configuration options that can be passed in under config:

PropertyRequired?ValueDescription

bootstrap.servers

REQUIRED

Example: localhost:9092

Kafka bootstrap server setting (can be found in server.properties file for Kafka

output.topic

REQUIRED

Example: microservice-topic

Name of Kafka topic to write to

key.field

REQUIRED

Example: destinationMicroservice

Field in the payload that will be used as the Key in the generated event.

topicnames.field

Optional

Example: topicsToWriteTo

Kafka topic names field in the payload which are used to determine which topic(s) to write the payload to. Kafka topic names should be separated by ';' If the topicnames.field field is populated in the payload, the payload will be written to the topic(s) provided, overriding whatever is provided in the config for output.topic.

Kafka is a trademark of the Apache Software Foundation

Last updated