Skip to content

Kafka-SQL with Go: A new way to interact with your topics (Part 1)

TL;DR

  1. Project Goal: Develop a tool to debug Kafka topic events, addressing challenges with protobuf and async behavior, with a focus on fast data consumption and SQL-based querying.
  2. Technical Choices: Utilizes franz-go for Kafka library due to performance, Gorm as the ORM for database interactions, and SQLite for its lightweight, serverless characteristics, supporting JSON data storage and querying.
  3. Implementation Details: Describes setting up a Kafka consumer and database schema using Gorm with SQLite, emphasizing easy setup, and efficient data handling.
  4. Execution: Combines Kafka data consumption and database storage in the main function, aiming for a quick and functional prototype despite potential best practice deviations.

The GitHub repository with the current state of this project: https://github.com/douglasdgoulart/kafka-sql

Why I Started This?

Okay, so picture this: me, sitting there, trying to debug some seriously wonky events popping up in a topic. Now, mix in protobuf and its asynchronous antics making my life harder. Sure, I could’ve just slapped some logs here and there and called it a day, but where’s the fun in that? I thought, “Hey, why not make a tool not just for kicks but something that might actually be useful to someone else too?”

The dream? Whip up a tool that gobbles up data from a topic like there’s no tomorrow, making it super easy to query this stuff with SQL in a database. We’re talking about a speed demon that lets you dive into the data without wanting to change anything—purely read-only vibes.

And the game plan for this little adventure:

  • franz-go as our Kafka library: In the world of Go, choosing your library is like picking a side in a gladiator fight.
  • Gorm and SQLite: Who doesn’t love a dynamic duo that makes connecting to and frolicking around in a database feel like a breeze?

So, yeah, this is my journey—part tech experiment, part personal challenge, and all about making something cool that maybe, just maybe, someone else will find handy too.

Why SqlLite?

SQLite is a relational database management system (RDBMS), similar to other SQL databases like MySQL or PostgreSQL, but with a significant difference: it’s serverless. This means it doesn’t operate based on a client-server model. Instead, SQLite runs directly within the applications that use it.

I chose this database for this project for two main reasons:

  1. It is Lightweight, It has a small footprint, both in terms of disk space and memory usage, making it ideal for devices with limited resources, such as smartphones, tablets, and even IoT devices.
  2. Zero Configuration. This is the main reason why I chose this database for this project. There is no need to install or run any docker.

If you want to know more about SqlLite there is this

The idea is to store all the messages as JSON. SqlLite has good support for JSON operations making it easy (but not that fast) to query data over JSON fields like this example. Imagine that you have this message being published on your topic:

{
  "user_id": "123",
  "user_action": "login"
}

Now you will have this value in your SqlLite table and you would like to count each action of each user. You can simply do this:

SELECT
  json_extract(data, '$.user_id') AS user_id,
  json_extract(data, '$.user_action') AS user_action,
  COUNT(*) AS total_actions
FROM
  messages
GROUP BY
  user_id,
  user_action;

The output might be something like this:

user_id | user_action | total_actions
--------|-------------|--------------
123     | login       | 2
123     | purchase    | 1
124     | logout      | 1
124     | purchase    | 2
125     | login       | 1
126     | login       | 1
126     | view        | 2
127     | logout      | 1
127     | view        | 1
128     | purchase    | 1

The idea explained let`s move forward to the actual coding part.

The Kafka consumer

Picking a kafka library to use kafka with Golang is not a straightforward task. There are lots of libs and lots of opinions about which one should you use. So, how did I choose? I chose the one that seems to be the most performance with their data: the frans-go lib. What made me choose this lib was a few opinions on Reddit and their benchmark at GitHub, they claim to be lots of times faster than the others.

This kind of benchmark should be replicated for real production applications, but I’m just doing this tiny little project on my own, and I have a lot of difficulty in getting personal things done, so just accepting others’ opinions should be the best for this project right now.

Creating a Kafka in Golang using franz-go, is not that hard. First of all, you will have to install the lib using:

go get -u github.com/twmb/franz-go
go get -u github.com/twmb/franz-go/pkg/kmsg

Just after that, we will create an struct to hold all of our internal parameters.

type KafkaConsumer struct {
	client  *kgo.Client
	msgChan chan<- *[]byte
}

This structure will carry our client and our channel, where the messages will be published after consumed. I will show the behavior of this channel in a few seconds, but the idea is to spawn a coroutine for every single consumed message, written into the channel.

Just after defining our struct, we are going to define the struct initializer:

func NewKafkaConsumer(config *util.KafkaConfiguration, msgChan chan<- *[]byte) *KafkaConsumer {
	cl, err := kgo.NewClient(
		kgo.SeedBrokers(config.Brokers),
		kgo.ConsumerGroup(config.GroupID),
		kgo.ConsumeTopics(config.Topic),
		kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
		kgo.SessionTimeout(time.Duration(config.SessionTimeout*int(time.Millisecond))),
		// TODO: add the missing configurations
	)
	if err != nil {
		panic(err)
	}

	return &KafkaConsumer{
		client:  cl,
		msgChan: msgChan,
	}
}

How this config works are not important right now, but keep in mind that it is the structure that will hold all of our application configurations. This function creates the client for kafka consumption and creates our KafkaConsumerstruct with the Kafka client and the channel to write the messages.

When defining channels like this chan<-we are ensuring that the channel in this context will just write into the channel and never read from it. This might prevent us from creating deadlock behaviors.

As you can see not all of the configurations are there yet, but that’s what a TODOcomment means: that that’s a problem for the future Douglas to handle.

After setting up the consumer, it is time to consume.

func (k *KafkaConsumer) Run(ctx context.Context) {
	defer k.client.Close()
	for {
		fetches := k.client.PollFetches(ctx)
		if errs := fetches.Errors(); len(errs) > 0 {
			panic(fmt.Sprint(errs))
		}

		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			go func() {
				k.msgChan <- &record.Value
			}()
		}
	}
}

I’m using the most basic approach that franz-go provides, it is getting a record at the time and processing it. The processing part is just spawning a new goroutine and sending the message into our channel to be processed. We might have a go routine being spawned. I still don’t know if this is the most performant approach, but as Donald Knuth once said: ” Premature optimization is the root of all evil“, so another task to the future Douglas, what a lucky guy.

The SQL connection

As I mentioned before we will be using Gorm as ORM to connect with the SqlLite database. Why GORM? Simple, it is the most popular period.

To install the dependencies to this project we ran:

go get -u gorm.io/gorm
go get -u gorm.io/driver/sqlite
go get -u gorm.io/datatypes

First of all, we will define the database struct:

package model

import (
	"time"

	"gorm.io/datatypes"
	"gorm.io/gorm"
)

type Message struct {
	gorm.Model
	InjetionTime time.Time
	Topic        string
	Partition    int32
	Offset       int64
	Data         datatypes.JSON `gorm:"type:jsonb"`
}

Now the DbRepository looks like this:

package db

import (
	"github.com/douglasdgoulart/kafka-sql/internal/model"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
	"gorm.io/gorm/clause"
)

type DBRepository struct {
	db *gorm.DB
}

func NewDBRepository(dialector gorm.Dialector) *DBRepository {
	if dialector == nil {
		dialector = sqlite.Open("database.db")
	}
	db, err := gorm.Open(dialector, &gorm.Config{})
	if err != nil {
		panic("failed to connect database")
	}

	db.AutoMigrate(&model.Message{})

	return &DBRepository{
		db: db,
	}
}

func (d *DBRepository) SaveMessage(message *model.Message) error {
	tx := d.db.Clauses(clause.OnConflict{DoNothing: true}).Create(message)
    return tx.Error
}

func (d *DBRepository) SaveMessages(messages []*model.Message, batchSize *int) {
	if batchSize == nil {
		defaultBatchSize := 1000
		batchSize = &defaultBatchSize
	}
	d.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(messages, *batchSize)
}

There are a few things to note in this little guy. At the NewDBRepository I’m saving all of the data into a SqlLite file called database.db but that’s just the default, the dialector can be changed for a test or even changing the database in the future. Stil at the NewDBRepository function, we are using the AutoMigrate function passing the message as schema, this function will automagically create the messages table for us, without any other interaction.

To save a message we have two functions to save a single message and to save lots of messages using a batch for it.

Just a tinny change

After creating the message struct, I just realized that it would be so much better if the kafka consumer could produce an actual message structure for our channel. So I did:

func (k *KafkaConsumer) Run(ctx context.Context) {
	defer k.client.Close()
	for {
		fetches := k.client.PollFetches(ctx)

		if errs := fetches.Errors(); len(errs) > 0 {
			panic(fmt.Sprint(errs))
		}

		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			go func(record *kgo.Record) {
				// TODO: add a desserializer to allow protobuf messages
				message := &model.Message{
					InjetionTime: record.Timestamp,
					Topic:        record.Topic,
					Partition:    record.Partition,
					Offset:       record.Offset,
					Data:         datatypes.JSON(record.Value),
				}
				k.msgChan <- message
			}(record)
		}
	}
}

You also might have noticed that we added another TODO comment here. The deserialization of the message will be something to be handled in another episode of this tool.

Now we have the channel carrying all of the necessary information to save our database record.

The main event function of the night

Finally, we have to wrap all of this together. Just as a disclaimer, I know that it is not a good idea to throw all of the application logic into the main, but I just want to have something working until the end of the day. So, now that we’ve got that out of the way, we can go into our main file.

package main

import (
	"context"
	"fmt"
	"sync"

	"github.com/douglasdgoulart/kafka-sql/internal/db"
	"github.com/douglasdgoulart/kafka-sql/internal/kafka"
	"github.com/douglasdgoulart/kafka-sql/internal/model"
	"github.com/douglasdgoulart/kafka-sql/internal/util"
)

func main() {
	msgChan := make(chan *model.Message)
	config := util.NewConfiguration()
	fmt.Printf("KafkaConfiguration: %+v\n", config.KafkaConfiguration)
	ctx := context.Background()

	var wg sync.WaitGroup

	repository := db.NewDBRepository(nil)
	k := kafka.NewKafkaConsumer(config.KafkaConfiguration, msgChan)

	wg.Add(1)
	go func() {
		defer wg.Done()
		k.Run(ctx)
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for msg := range msgChan {
			err := repository.SaveMessage(msg)
			if err != nil {
				fmt.Printf("Error saving message: %s\n", err)
			}
		}
	}()

	wg.Wait()
}

First of all, we’ve created our messages channel, which will be responsible for connecting our consumers to our database.

After some setups, we can run our Kafka consumer into a go routine and also run a new go routine to fetch the channel messages one by one (this might be improved soon) and save them into the database.

So much to be done

We finally have something functional, but some key features are missing:

  • It is still slow: I’m going to build a benchmark test to measure, but my initial perception is that it is far from loading 10 million messages fast into the database.
  • Protobuf deserialization: The way the project is right now is considering just a JSON message being published to the topic, not a protobuf.
  • Configuration and setup: This project has a configuration with .env an environment variable to make it configurable, but I think it should be turned into a CLI with flags and commands to configure exactly what you want to build
  • CI and testing: I’ve built some tests, but it is far from good testing. Almost all of them were just generated using IA, copied and pasted into the editor just to check if everything was working as expected. Testing cases are missing. I would also like to attach those tests to a CI pipeline to ensure that I’m not breaking anything on the flight.
  • CD setup: I want to turn it into a docker image so anyone (with docker) can run without having to install the CLI. To accomplish that I will contenerize the application and build a CD pipeline to build and deploy the image.

After all, this journey has just begun but I have something functional to be proud of. If you think this too might be useful to you, feel free to contribute with both code or feature suggestions.

That’s all folks, stay tuned for part 2.

Published inBlog

One Comment

  1. Niceee article! Really handy to have, mostly because Go it does not have the best logging system. How would you handle the memory limit of the db? should you clean logs or just have an expiration time?

Leave a Reply

Your email address will not be published. Required fields are marked *