Em chào mọi người ạ, em đang sử dụng kafka cho hệ thống của em, em đặt 1 cái application gateway (grpc server) push message vào kafka broker và return response về (chưa xử lý) thì thời gian response rất lâu ạ, toàn hơn 1000ms (sau đó listener service sẽ hứng request từ kafka broker và gọi request đến main-service). So với việc gọi request trực tiếp từ ingress gateway đến main-service thì tính luôn thời gian xử lý chỉ dao động 100ms. Nhờ mọi người xem giúp em ạ. Dưới đây là một vài đoạn code của application gateway bằng golang
user.go
type userMicroservice struct {
cfg *common.Config
kafkaProducer kafka_client.Producer
service_auth.UnimplementedUserServiceServer
}
func NewUserService(cfg *common.Config, kafkaProducer kafka_client.Producer) *userMicroservice {
return &userMicroservice{
cfg: cfg,
kafkaProducer: kafkaProducer,
}
}
func (x *userMicroservice) UpdateUser(ctx context.Context, req *service_auth.UpdateUserRequest) (*service_auth.UpdateUserResponse, error) {
reqBytes, err := proto.Marshal(req)
if err != nil {
return nil, err
}
err = x.kafkaProducer.PublishMessage(ctx, kafka.Message{
Topic: x.cfg.KafkaTopicsConfig.UserUpdate.TopicName,
Value: reqBytes,
Time: time.Now().UTC(),
Headers: kafka_client.GrpcMetadataToKafkaHeader(ctx),
})
if err != nil {
return nil, err
}
return &service_auth.UpdateUserResponse{}, nil
}
producer.go
type Producer interface {
PublishMessage(ctx context.Context, msgs ...kafka.Message) error
Close() error
}
type producer struct {
brokers []string
w *kafka.Writer
}
func NewProducer(brokers []string) *producer {
return &producer{
brokers: brokers,
w: NewWriter(brokers),
}
}
func (p *producer) PublishMessage(ctx context.Context, msgs ...kafka.Message) error {
return p.w.WriteMessages(ctx, msgs...)
}
func (p *producer) Close() error {
return p.w.Close()
}
writer.go
const (
writerReadTimeout = 10 * time.Second
writerWriteTimeout = 10 * time.Second
writerRequiredAcks = -1
writerMaxAttempts = 3
)
// NewWriter create new configured kafka writer
func NewWriter(brokers []string) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Balancer: &kafka.LeastBytes{},
RequiredAcks: writerRequiredAcks,
MaxAttempts: writerMaxAttempts,
Compression: compress.Snappy,
ReadTimeout: writerReadTimeout,
WriteTimeout: writerWriteTimeout,
Async: false,
}
return w
}
docker-compose.yaml
zookeeper:
image: zookeeper:3.4.9
restart: "no"
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.5.1
restart: "no"
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-host.docker.internal}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-host.docker.internal}
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper