Apache Kafka Streamer
Overview
Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache. Either of the following two methods can be used to achieve such streaming:
-
using Kafka Connect functionality with Ignite sink
-
importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming
Streaming Data via Kafka Connect
IgniteSinkConnector will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing
it to your specified cache. The connector can be found in the optional/ignite-kafka module. It and its dependencies
have to be on the classpath of a Kafka running instance, as described in the following subsection. For more information
on Kafka Connect, see Kafka Documentation.
Setting up and Running
-
Add the
IGNITE_HOME/libs/optional/ignite-kafkamodule to the application classpath. -
Prepare worker configurations, e.g.,
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 -
Prepare connector configurations, e.g.,
# connector name=my-ignite-connector connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector tasks.max=2 topics=someTopic1,someTopic2 # cache cacheName=myCache cacheAllowOverwrite=true igniteCfg=/some-path/ignite.xml singleTupleExtractorCls=my.company.MyExtractor-
where
cacheNameis the name of the cache you specify in/some-path/ignite.xmland the data fromsomeTopic1,someTopic2will be pulled and stored. -
cacheAllowOverwritecan be set totrueif you want to enable overwriting existing values in the cache. -
If you need to parse the incoming data and decide on your new key and value, you can implement it as
StreamSingleTupleExtractorand specify assingleTupleExtractorCls. -
You can also set
cachePerNodeDataSizeandcachePerNodeParOpsto adjust per-node buffer and the maximum number of parallel stream operations for a single node.
-
-
Start connector, for instance, in a standalone mode as follows,
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
Checking the Flow
To perform a very basic functionality check, you can do the following,
-
Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties -
Start Kafka server
bin/kafka-server-start.sh config/server.properties -
Provide some data input to the Kafka server
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1 -
Start the connector
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties -
Check the value is in the cache. For example, via REST API,
http://node1:8080/ignite?cmd=size&cacheName=cache1
Streaming data with Ignite Kafka Streamer Module
If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module
dependency like this (replace ${ignite-kafka-ext.version} with actual Ignite Kafka Extension version you are interested in):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-kafka-ext</artifactId>
<version>${ignite-kafka-ext.version}</version>
</dependency>
...
</dependencies>
...
</project>
Having a cache with String keys and String values, the streamer can be started as follows
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"));
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic(someKafkaTopic);
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);
kafkaStreamer.start();
...
// stop on shutdown
kafkaStreamer.stop();
strm.close();
For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
