Logstash x Kafka Integration

Learn on how reading Kafka messages in different format is simple and easy!

·

4 min read

In this article we will explore using existing plugins such as Kafka Input. In addition, this article will also share some of the open issues which we can track in elastic's discuss forum.

Pre-requiste:

  • Ensure Kafka is up and running, for me I am running it on docker for ease.
  • Download latest Logstash (I am running 8.4.3)

Lets get Started:

String Message Input

At the start the logstash sample.conf, and yet the simplest form:

input {
  stdin { }
}

filter { }

output {
   stdout {  }
}

In this, we can type anything in the terminal, for example: another hello world, it will give you the following output:

{
    "message" => "another hello world",
    "event" => {
        "original" => "another hello world"
    },
    "host" => {
        "hostname" => "Smits-MacBook-Pro.local"
    },
    "@timestamp" => 2022-10-14T15:17:15.561174Z,
    "@version" => "1"
}

Now lets step up, we will use Kafka-Input:

Ref: elastic.co/guide/en/logstash/current/plugin..

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["hash-node-topic"]
  }
}

filter { }

output {
   stdout {  }
}

Lets publish some events to Kafka using command line, tool or any application:

For this tutorial, I am using command-line

kafka-console-producer --broker-list localhost:9092 --topic hash-node-topic

It will prompt you to put a message, lets put the same message as earlier, hello world

If the configuration is correct, it will show you below output:

{
       "message" => "hello world",
    "@timestamp" => 2022-10-14T15:22:21.454041Z,
      "@version" => "1",
         "event" => {
        "original" => "hello world"
    }
}

With Kafka input or normal input, the structure and key-fields of the logstash isnt altered.

JSON Input Message

Lets make the input a little bit more complex, since the next steps requires you to publish Kafka messages in different format apart from String, I will recommend to use any Java or other languages to publish the message.

without any altering of the logstash configuration, but publishing the message in JSON format, the logstash output will look similiar to this:

{
       "message" => "{\"first_name\":\"smit\",\"last_name\":\"shah\"}",
    "@timestamp" => 2022-10-14T15:28:20.207420Z,
      "@version" => "1",
         "event" => {
        "original" => "{\"first_name\":\"smit\",\"last_name\":\"shah\"}"
    }
}

However, the above message is not easily readable in terms of code, lets say you want to further altering and add logic to it. Lets use codec => json to beautify the code

updated configuration will look like:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["hash-node-topic"]
    codec => json
  }
}

filter { }

output {
   stdout {  }
}

Lets produce the same JSON message, now you will a slightly different output.

{
      "@version" => "1",
    "@timestamp" => 2022-10-14T15:33:10.591611Z,
     "last_name" => "shah",
    "first_name" => "smit",
         "event" => {
        "original" => "{\"first_name\":\"smit\",\"last_name\":\"shah\"}"
    }
}

Not only you can see a original JSON, but also parsed JSON with fields coming directly at the root level of the output.

Avro Input Message

Before heading to a even complex topic, I will recommend to read on schema-registry of Kafka, and also set the same up in local machine. Since in-order for Avro to work, you will need schema-registry.

Here is the avro schema, I will be using

{
    "type": "record",
    "name": "MyRecord",
    "namespace": "com.mycompany",
    "fields" : [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"}
    ]
}

You can registry the same schema, and verify with below url

localhost:8081/subjects/hash-node-topic-avr..

the output will be similiar to the above schema:

{"subject":"hash-node-topic-avro-value","version":2,"id":3,"schema":"{\"type\":\"record\",\"name\":\"MyRecord\",\"namespace\":\"com.mycompany\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"}]}"}

Now lets re-configure the logstash configuration to read from schema-registry

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["hash-node-topic-avro"]
    schema_registry_url => "http://localhost:8081"
  }
}

filter { }

output {
   stdout {  }
}

As you can see, we only have to add schema_registry_url, and rest it automatically taken care by the Logstash. It also parses the message exactly the way when we use to add codec

Output:

{
    "@timestamp" => 2022-10-14T15:43:46.254993Z,
     "last_name" => "world",
    "first_name" => "hello",
      "@version" => "1",
         "event" => {
        "original" => "{\"first_name\": \"hello\", \"last_name\": \"world\"}"
    }
}

For this tutorial, thats about it. In the next tutorial, we will explore Kafka output.

Feel free to share your thoughts and feedbacks on comments or on twitter at @smit_shah_95