Featured image of post Kafka

Kafka

import

1
import "github.com/segmentio/kafka-go"

Topic

DeleteTopics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// network, address := "tcp", "localhost:9092"
conn, err := kafka.Dial(network, address)
if err != nil {
    panic(err)
}

// topic := "topic"
p, _ := conn.ReadPartitions(topic)
if len(p) != 0 {
    err = conn.DeleteTopics(topic)
    if err != nil {
        panic(err)
    }
}

CreateTopics

1
2
3
4
5
6
7
8
9
// partitionLen := 1
err = conn.CreateTopics(kafka.TopicConfig{
    Topic:             topic,
    NumPartitions:     partitionLen,
    ReplicationFactor: 1, // stand-alone
})
if err != nil {
    panic(err)
}

Reader / Writer

Reader

without consumer group

1
2
3
4
5
6
// broker := []string{"localhost:9092"}
kafka.NewReader(kafka.ReaderConfig{
    Brokers:   broker,
    Topic:     topic,
    Partition: partition,
})

with consumer group

1
2
3
4
5
6
7
8
// broker := []string{"localhost:9092"}
// consumerGroupId := "consumer"
kafka.NewReader(kafka.ReaderConfig{
    Brokers:        broker,
    GroupID:        consumerGroupId,
    Topic:          topic,
    //CommitInterval: time.Second,
})

Writer

1
2
3
4
5
6
// broker := []string{"localhost:9092"}
kafka.Writer{
    Addr:  kafka.TCP(broker...),
    Topic: topic,
    //Balancer: &kafka.CRC32Balancer{},
}

Consumer

1
2
// r *kafka.Reader
msg, err := r.ReadMessage(ctx)

Producer

1
2
3
4
5
6
7
// w *kafka.Writer
err := w.WriteMessages(ctx,
    kafka.Message{
        Key:   idBytes, // partition by balancer
        Value: []byte(value),
    },
)

Docker

1
2
3
4
5
docker run -d \
    --name kafka \
    -m 2g \
    -p 9092:9092 \
    apache/kafka:latest
Licensed under CC BY-NC-SA 4.0