Basics of ruby-kafka gem

Introduction

In the previous article, Kafka 1.0 Basics, we ran the shell scripts that was shipped with Kafka to produce and consume messages. In this article, we will see how easy it is to use ruby-kafka gem to create producer and consumer to interact with Kafka in Ruby.

Versions

You can infer version from the string kafka_2.11-1.0.2 as:

Scala Version: 2.11
Kafka version: 1.0.2

I am using the gem ruby-kafka version: 0.7.4.

Produce Message

Let's run the example from ruby-kafka home page:

require "kafka"

# The first argument is a list of "seed brokers" that will be queried for the full
# cluster topology. At least one of these *must* be available. `client_id` is
# used to identify this client in logs and metrics. It's optional but recommended.
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")

This fails with the error:

ruby /vagrant/kafka.rb 
/var/lib/gems/2.3.0/gems/ruby-kafka-0.7.4/lib/kafka/cluster.rb:396:in `fetch_cluster_info': Could not connect to any of the seed brokers: (Kafka::ConnectionError)
- kafka://kafka2:9092: getaddrinfo: Name or service not known
- kafka://kafka1:9092: getaddrinfo: Name or service not known

We need to use localhost instead of kafka1 as the host name. Here is a simple Kafka producer:

require "kafka"

# The first argument is a list of "seed brokers" that will be queried for the full
# cluster topology. At least one of these *must* be available. `client_id` is
# used to identify this client in logs and metrics. It's optional but recommended.
kafka = Kafka.new(["localhost:9092"], client_id: "mykafka-app")
kafka.deliver_message("Hello, World!", topic: "greetings")

We now get a different error message.

$ ruby /vagrant/kafka.rb 
[2018-11-26 17:45:52,123] INFO Topic creation {"version":1,"partitions":{"0":[2]}} (kafka.admin.AdminUtils$)
[2018-11-26 17:45:52,124] INFO [KafkaApi-0] Auto creation of topic greetings with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
/var/lib/gems/2.3.0/gems/ruby-kafka-0.7.4/lib/kafka/protocol.rb:156:in `handle_error': Kafka::LeaderNotAvailable (Kafka::LeaderNotAvailable)
    from /var/lib/gems/2.3.0/gems/ruby-kafka-0.7.4/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
    from /var/lib/gems/2.3.0/gems/ruby-kafka-0.7.4/lib/kafka/cluster.rb:145:in `partitions_for'
    from /var/lib/gems/2.3.0/gems/ruby-kafka-0.7.4/lib/kafka/client.rb:146:in `deliver_message'
    from /vagrant/kafka.rb:7:in `<main>'

We got past the host error. The greetings topic did not exist. We also see in the standard output that ruby-kafka gem created the greetings topic for us with 1 partition and 1 replication factor. Now it is failing with LeaderNotAvailable exception. Just wait for few minutes for a leader to be elected. If you run kafka.rb:

ruby /vagrant/kafka.rb 
[2018-11-26 17:57:45,734] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: greetings-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

You don't see any error messages. So, let's now consume the message we produced by running a simple consumer example code from readme of ruby-kafka home page.

Consume Messages

Create consume.rb with:

require 'kafka'

kafka = Kafka.new(["localhost:9092"], client_id: "mykafka-app")

kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

We can see the message that was produced:

ruby /vagrant/consume.rb 
0

Hello, World!

The 0 is the offset. The key value is nil and the message is the 'Hello, World!' string. The program will constantly poll from more messages to consume and will just wait in the standard console. You can run the consumer in the background and produce more messages using the producer program. The consumer running in the background will consume the messages and print them in the standard console.

Consumer Group

Consumer group provides several advantages. You can read about them in the readme of ruby-kafka home page. Create group.rb:

require "kafka"

kafka = Kafka.new(["localhost:9092"])

# Consumers with the same group id will form a Consumer Group together.
consumer = kafka.consumer(group_id: "my-group")

# It's possible to subscribe to multiple topics by calling `subscribe` repeatedly.
consumer.subscribe("greetings")

# Stop the consumer when the SIGTERM signal is sent to the process.
# It's better to shut down gracefully than to kill the process.
trap("TERM") { consumer.stop }

# This will loop indefinitely, yielding each message in turn.
consumer.each_message do |message|
  puts "Topic: #{message.topic}", "Partition: #{message.partition}"
  puts "Offset: #{message.offset}", "Key: #{message.key}", "Value: #{message.value}"
end

You can now see the topic, partition, offset, key and the value in the output.

ruby /vagrant/group.rb 
[2018-11-26 18:16:59,884] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-11-26 18:16:59,885] INFO [GroupCoordinator 0]: Stabilized group my-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-11-26 18:16:59,886] INFO [GroupCoordinator 0]: Assignment received from leader for group my-group for generation 5 (kafka.coordinator.group.GroupCoordinator)

Topic: greetings
Partition: 0
Offset: 2
Key: 
Value: Kafka, for All!

The ruby-kafka gem home page has more details about using Kafka and is a great resource to learn more.


Related Articles


Ace the Technical Interview

  • Easily find the gaps in your knowledge
  • Get customized lessons based on where you are
  • Take consistent action everyday
  • Builtin accountability to keep you on track
  • You will solve bigger problems over time
  • Get the job of your dreams

Take the 30 Day Coding Skills Challenge

Gain confidence to attend the interview

No spam ever. Unsubscribe anytime.