Featured image of post RocketMQ

RocketMQ

import

1
import "github.com/apache/rocketmq-client-go/v2"

log

1
import "github.com/apache/rocketmq-client-go/v2/rlog"
1
rlog.SetLogLevel("error")

Topic

CreateTopic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// nameSrvAddr := "127.0.0.1:9876"
adminClient, err := admin.NewAdmin(
    admin.WithResolver(primitive.NewPassthroughResolver([]string{nameSrvAddr})),
    admin.WithCredentials(primitive.Credentials{
        AccessKey: accessKey,
        SecretKey: secretKey,
    }),
)
if err != nil {
    panic(err)
}

list, err := adminClient.FetchAllTopicList(ctx)
if err != nil {
    panic(err)
}

for i := range list.TopicList {
    if list.TopicList[i] == topic {
        return
        //deleteTopic(ctx, topic)
    }
}

err = adminClient.CreateTopic(ctx,
    admin.WithTopicCreate(topic),
    admin.WithBrokerAddrCreate(brokerAddr),
)
if err != nil {
    panic(err)
}

DeleteTopic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// nameSrvAddr := "127.0.0.1:9876"
adminClient, err := admin.NewAdmin(
    admin.WithResolver(primitive.NewPassthroughResolver([]string{nameSrvAddr})),
    admin.WithCredentials(primitive.Credentials{
        AccessKey: accessKey,
        SecretKey: secretKey,
    }),
)
if err != nil {
    panic(err)
}

// brokerAddr := "127.0.0.1:10911"
err = adminClient.DeleteTopic(ctx,
    admin.WithTopicDelete(topic),
    admin.WithBrokerAddrDelete(brokerAddr),
)
if err != nil {
    panic(err)
}

Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// workId := "1"
// consumerGroup := "consumer"
// nameSrvAddr := "127.0.0.1:9876"
c, err := rocketmq.NewPushConsumer(
    consumer.WithInstance(workId),
    consumer.WithGroupName(consumerGroup),
    consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{nameSrvAddr})),
)
if err != nil {
    panic(err)
}

err = c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
    messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for _, msg := range messages {
        _ = msg
    }

    return consumer.ConsumeSuccess, nil
})
if err != nil {
    panic(err)
}

err = c.Start()
if err != nil {
    panic(err)
}

<-ctx.Done()

err = c.Shutdown()
if err != nil {
    panic(err)
}

Producer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// nameSrvAddr := "127.0.0.1:9876"
p, err := rocketmq.NewProducer(
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{nameSrvAddr})),
    producer.WithRetry(2),
)
if err != nil {
    panic(err)
}

err = p.Start()
if err != nil {
    panic(err)
}

msg := primitive.NewMessage(topic, []byte(value))
//msg.WithDelayTimeLevel(3)
res, err := p.SendSync(ctx, msg)
if err != nil {
    panic(err)
}

_ = res

err = p.Shutdown()
if err != nil {
    panic(err)
}

Docker

compose.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
services:
  namesrv:
    image: apache/rocketmq:4.9.6
    container_name: rmq.namesrv
    mem_limit: 1g
    ports:
      - 9876:9876
      - 10909:10909 # for broker
      - 10911:10911 # for broker
      - 10912:10912 # for broker
      - 18080:8080 # for dashboard
    networks:
      - rocketmq
    command: sh mqnamesrv
    restart: unless-stopped

  broker:
    image: apache/rocketmq:4.9.6
    container_name: rmq.broker
    mem_limit: 3g
    environment:
      - NAMESRV_ADDR=rmq.namesrv:9876
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    network_mode: "service:namesrv"
    command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    restart: unless-stopped
    depends_on:
      - namesrv

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rmq.dashboard
    mem_limit: 1g
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmq.namesrv:9876
    network_mode: "service:namesrv"
    restart: unless-stopped
    depends_on:
      - namesrv
      - broker


networks:
  rocketmq:
    name: rocketmq
    driver: bridge

broker.conf

1
brokerIP1=127.0.0.1
Licensed under CC BY-NC-SA 4.0