dạ em vẫn chưa tìm ra nghiên nhân. Sau đây là code của em
Khi em tạo transaction thành công thì em sẽ gọi thằng này
import { Inject, Injectable } from '@nestjs/common'
import { ClientProxy } from '@nestjs/microservices'
@Injectable()
export class MqttActionService {
constructor(
@Inject('TRANSACTION_SERVICE')
private readonly client: ClientProxy
) {
this.client.connect()
}
async publishMessageToMqttServer(): Promise<void> {
this.client.send({ cmd: 'new_transaction' }, 'Hello world').subscribe()
}
}
@Module({
imports: [
...typeOrmModules,
HttpModule,
AppConfigModule,
ClientsModule.register([
{
name: 'TRANSACTION_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883'
}
}
])
],
providers: services,
exports: services
})
export class ActionsModule {}
Broker
const aedes = require("aedes")();
const server = require("net").createServer(aedes.handle);
const port = 1883;
aedes.on("client", (client) => {
console.log("Client connected");
});
aedes.on("publish", function (packet, client) {
if (client) {
console.log(
`MESSAGE_PUBLISHED : MQTT Client ${
client ? client.id : "AEDES BROKER_" + aedes.id
} has published message "${packet.payload}" on ${
packet.topic
} to aedes broker ${aedes.id}`
);
}
});
aedes.on("subscribe", function (subscriptions, client) {
console.log(
`TOPIC_SUBSCRIBED : MQTT Client ${
client ? client.id : client
} subscribed to topic: ${subscriptions
.map((s) => s.topic)
.join(",")} on aedes broker ${aedes.id}`
);
});
server.listen(port, function () {
console.log("server started and listening on port ", port);
});
Notification Module
import { Controller, Get, Inject } from '@nestjs/common'
import { AppService } from './app.service'
import {
ClientProxy,
EventPattern,
MessagePattern,
Payload
} from '@nestjs/microservices'
@Controller()
export class AppController {
constructor(
@Inject('NOTIFICATION_SERVICE')
private readonly client: ClientProxy,
private readonly appService: AppService
) {
this.client.connect()
}
@Get()
getHello(): string {
return this.appService.getHello()
}
@MessagePattern({ cmd: 'new_transaction' })
test(@Payload() data: string) {
return `Hello bro ${data}`
}
}
import { Module } from '@nestjs/common'
import { ClientsModule, Transport } from '@nestjs/microservices'
import { AppController } from './app.controller'
import { AppService } from './app.service'
@Module({
imports: [
ClientsModule.register([
{
name: 'NOTIFICATION_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883'
}
}
])
],
controllers: [AppController],
providers: [AppService]
})
export class AppModule {}
Khi em start Broker lên và tạo transaction
server started and listening on port 1883
Client connected
MESSAGE_PUBLISHED : MQTT Client mqttjs_fa50b9af has published message "{"pattern":{"cmd":"new_transaction"},"data":"Hello world","id":"66a8a368-fa67-419b-8fae-556880dc7ba3"}" on {"cmd":"new_transaction"} to aedes broker 7a19aefe-3c44-48fd-96fd-7cf812d474ef
Client connected
TOPIC_SUBSCRIBED : MQTT Client mqttjs_fa50b9af subscribed to topic: {"cmd":"new_transaction"}/reply on aedes broker 7a19aefe-3c44-48fd-96fd-7cf812d474ef
Note: Nếu dùng .subscribe ở function publishMessageToMqttServer thì sẽ log ra subscribe như ở trên ạ