Publish message Kafka chạy chậm x10 so với không dùng kafka

Em chào mọi người ạ, em đang sử dụng kafka cho hệ thống của em, em đặt 1 cái application gateway (grpc server) push message vào kafka broker và return response về (chưa xử lý) thì thời gian response rất lâu ạ, toàn hơn 1000ms (sau đó listener service sẽ hứng request từ kafka broker và gọi request đến main-service). So với việc gọi request trực tiếp từ ingress gateway đến main-service thì tính luôn thời gian xử lý chỉ dao động 100ms. Nhờ mọi người xem giúp em ạ. Dưới đây là một vài đoạn code của application gateway bằng golang
user.go

type userMicroservice struct {
	cfg           *common.Config
	kafkaProducer kafka_client.Producer
	service_auth.UnimplementedUserServiceServer
}

func NewUserService(cfg *common.Config, kafkaProducer kafka_client.Producer) *userMicroservice {
	return &userMicroservice{
		cfg:           cfg,
		kafkaProducer: kafkaProducer,
	}
}

func (x *userMicroservice) UpdateUser(ctx context.Context, req *service_auth.UpdateUserRequest) (*service_auth.UpdateUserResponse, error) {
	reqBytes, err := proto.Marshal(req)
	if err != nil {
		return nil, err
	}

	err = x.kafkaProducer.PublishMessage(ctx, kafka.Message{
		Topic:   x.cfg.KafkaTopicsConfig.UserUpdate.TopicName,
		Value:   reqBytes,
		Time:    time.Now().UTC(),
		Headers: kafka_client.GrpcMetadataToKafkaHeader(ctx),
	})

	if err != nil {
		return nil, err
	}

	return &service_auth.UpdateUserResponse{}, nil
}

producer.go


type Producer interface {
	PublishMessage(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

type producer struct {
	brokers []string
	w       *kafka.Writer
}

func NewProducer(brokers []string) *producer {
	return &producer{
		brokers: brokers,
		w:       NewWriter(brokers),
	}
}

func (p *producer) PublishMessage(ctx context.Context, msgs ...kafka.Message) error {
	return p.w.WriteMessages(ctx, msgs...)
}

func (p *producer) Close() error {
	return p.w.Close()
}

writer.go


const (
	writerReadTimeout  = 10 * time.Second
	writerWriteTimeout = 10 * time.Second
	writerRequiredAcks = -1
	writerMaxAttempts  = 3
)

// NewWriter create new configured kafka writer
func NewWriter(brokers []string) *kafka.Writer {
	w := &kafka.Writer{
		Addr:         kafka.TCP(brokers...),
		Balancer:     &kafka.LeastBytes{},
		RequiredAcks: writerRequiredAcks,
		MaxAttempts:  writerMaxAttempts,
		Compression:  compress.Snappy,
		ReadTimeout:  writerReadTimeout,
		WriteTimeout: writerWriteTimeout,
		Async:        false,
	}
	return w
}

docker-compose.yaml

  zookeeper:
    image: zookeeper:3.4.9
    restart: "no"
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.5.1
    restart: "no"
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-host.docker.internal}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-host.docker.internal}
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper

image
image

Cậu có thể cho tớ một cái hình mô tả kiến trúc hệ thống của cậu được không? Tớ không hiểu lắm kiến trúc hiện tại của cậu.

Thời gian 1s ở trên là flow publish message hay consume message vậy cậu?


Về cơ bản, chỉ với code config Kafka và mấy mô tả, không ai có thể biết thêm điều gì từ đó. Cậu cần thu hẹp phạm vi vấn đề lại.
Cậu có thể thử đặt các dòng log để in ra thời gian thực thi ở các đoạn code quan trọng. Cậu có thể check xem chính xác đoạn code nào chậm trước khi phân tích thêm.

Hope it helps!

3 Likes

Em cảm ơn anh đã quan tâm.
Em gửi hình hệ thống của em hiện tại

Ingress gateway hứng request từ bên ngoài server, hiện tại em đang chuyển hướng request updateUser sang cict-gateway, service này nhận request và push message vào kafka broker đặt ở docker nằm bên ngoài kubernetes; sau đó cict-listener nhận request từ broker và gửi request cho cict-auth.
Việc xử lý request ở cict-gateway chỉ là push message lên broker thôi ạ, không có thao tác nào khác nữa.

đây chỉ mới push message thôi ạ chứ chưa tính đến consume nữa.

83% thành viên diễn đàn không hỏi bài tập, còn bạn thì sao?