Using racecar to Consume Kafka messages in Rails

In the bundle console, run Waterdrop setup:

WaterDrop.setup do |config|
  config.deliver = true
  config.kafka.seed_brokers = %w[kafka://localhost:9092]
end

You can see Waterdrop has default settings for Kafka configurations:

{:client_id=>"waterdrop", :logger=>#<NullLogger:0x0055731e74f9c8>, :monitor=>#<WaterDrop::Instrumentation::Monitor:0x0055731e74e8c0 @id=:waterdrop, @clock=#<Dry::Monitor::Clock:0x0055731f08b370>, @__bus__=#<Dry::Events::Bus:0x0055731e74cd68 @listeners=#<Concurrent::Map:0x0055731e74d2e0 entries=0 default_proc=#<Proc:0x0055731f0cac50@/var/lib/gems/2.3.0/gems/dry-events-0.1.0/lib/dry/events/constants.rb:8>>, @events=#<Concurrent::Map:0x0055731e74d740 entries=4 default_proc=nil>>>, :deliver=>true, :raise_on_buffer_overflow=>true, :kafka=>{:seed_brokers=>["kafka://localhost:9092"], :connect_timeout=>10, :socket_timeout=>30, :max_buffer_bytesize=>10000000, :max_buffer_size=>1000, :max_queue_size=>1000, :ack_timeout=>5, :delivery_interval=>10, :delivery_threshold=>100, :max_retries=>2, :required_acks=>-1, :retry_backoff=>1, :compression_threshold=>1, :compression_codec=>nil, :ssl_ca_cert=>nil, :ssl_ca_cert_file_path=>nil, :ssl_ca_certs_from_system=>false, :ssl_client_cert=>nil, :ssl_client_cert_key=>nil, :sasl_gssapi_principal=>nil, :sasl_gssapi_keytab=>nil, :sasl_plain_authzid=>"", :sasl_plain_username=>nil, :sasl_plain_password=>nil, :sasl_scram_username=>nil, :sasl_scram_password=>nil, :sasl_scram_mechanism=>nil}}

There are lot of Kafka configuration parameters. You can see the acks defaults to -1. This means the leader partition got the message and the message is replicated to the followers. This setting will result is higher latency than 0 or 1. The 0 means as soon as the message is sent, it is considered to be successful. The 1 means the leader got the message. You can also see we are not using SSL or SASL for securing the Kafka. We can send a message.

WaterDrop::AsyncProducer.call('Hi water drop', topic: 'test-top')

You have to install development tools first. On my Ubuntu VM:

sudo apt-get install git-core curl zlib1g-dev build-essential libssl-dev libreadline-dev libyaml-dev libsqlite3-dev sqlite3 libxml2-dev libxslt1-dev libcurl4-openssl-dev software-properties-common libffi-dev nodejs yarn

We can now install rails gem. Add the racecar gem to the Gemfile and run bundle. Create the racecar configuration file.

bundle exec rails g racecar:install

The generated config file:

# These config values will be shared by all environments but can be overridden.
common: &common
  client_id: "rafk"

development:
  <<: *common
  brokers:
    - localhost:9092

test:
  <<: *common
  brokers:
    - localhost:9092

production:
  <<: *common
  brokers:
    - kafka1.myapp.com:9092
    - kafka2.myapp.com:9092
    - kafka3.myapp.com:9092

On interesting thing is that you can change the localhost to kafka and it will work. You can generate a consumer class by using the racecar generator.

bundle exec rails g racecar:consumer Print

This will generate the print_consumer.rb that extends from Racecar::Consumer class.

class PrintConsumer < Racecar::Consumer
  subscribes_to "test-top"

  def process(message)
    puts "Received message: #{message.value}"
  end
end

I have changed the topic name to test-top. We can read the message from the queue using this consumer.

bundle exec racecar PrintConsumer

This will output:

=> Starting Racecar consumer PrintConsumer...
=> Detected Rails, booting application...
=> Wrooooom!
=> Ctrl-C to shutdown consumer
I, [2019-02-04T01:53:37.132672 #14065]  INFO -- : New topics added to target list: test-top
I, [2019-02-04T01:53:37.132820 #14065]  INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2019-02-04T01:53:37.132963 #14065] DEBUG -- : Opening connection to localhost:9092 with client id rafk...
D, [2019-02-04T01:53:37.133904 #14065] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2019-02-04T01:53:37.134239 #14065] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2019-02-04T01:53:37.145499 #14065] DEBUG -- : Received response 1 from localhost:9092
I, [2019-02-04T01:53:37.145709 #14065]  INFO -- : Discovered cluster metadata; nodes: localhost:9092 (node_id=0)
D, [2019-02-04T01:53:37.145799 #14065] DEBUG -- : Closing socket to localhost:9092
I, [2019-02-04T01:53:37.146486 #14065]  INFO -- : Joining group `print-consumer`

Received message: Hi water drop

The output shows the message in the test topic that has one message. We can also see that it joins a consumer group named print-consumer. We can run many instances of our consumer and they will join this consumer group. In a production environment, they will be run on different hosts to achieve fail over.

Running Multiple Consumers in a Group

We can put multiple messages and see what happens when we run multiple consumers.

10.times do |index|
  WaterDrop::AsyncProducer.call("Counting #{index}", topic: 'test-top')
end

You can run multiple consumers by running the command:

bundle exec racecar PrintConsumer

multiple times. I ran two print consumers. But I only saw messages consumed in one of the consumer. The other consumer was always idle. To troubleshoot this problem, let's install kafkacat.

apt-get install kafkacat

We can now get the metadata for all topics:

kafkacat -L -b localhost
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 0 at localhost:9092
 6 topics:
  topic "test-topic" with 1 partitions:

As you can see from this output, there is only one partition for the test-topic. Remember that we cannot have more consumers than the partitions in a topic. This is the reason that we did not see any messages being consumed in the second print consumer. The topic was created by the waterdrop gem when we published a message to it. By default the number of partition is one. We can always increase the number of partitions but we cannot reduce the number of partitions.

References


Related Articles


Ace the Technical Interview

  • Easily find the gaps in your knowledge
  • Get customized lessons based on where you are
  • Take consistent action everyday
  • Builtin accountability to keep you on track
  • You will solve bigger problems over time
  • Get the job of your dreams

Take the 30 Day Coding Skills Challenge

Gain confidence to attend the interview

No spam ever. Unsubscribe anytime.