物联网工具Aedes教程1:基本介绍

1.引言

物联网消息服务器(mqtt broker)比较常用的有:emqx,mosquitto,mosca ;

其中,mosca采用nodejs实现,但github上的mosca项目已经停止维护了;

mosca的原班人马重新打造了aedes, 支持数据持久化和集群,性能更优越;

Node-Red中配置节点:node-red-contrib-aedes,可以快速启动一个物联网消息中间件,体验一下aedes;

威武网会陆续推出关于Aedes的相关资料,大家一起学习!

2.API介绍

参考文档:

https://github.com/moscajs/aedes/blob/master/docs/Aedes.md

  • new Aedes([options]) / new Aedes.Server([options])

  • aedes.id

  • aedes.connectedClients

  • aedes.closed

  • Event: client

  • Event: clientReady

  • Event: clientDisconnect

  • Event: clientError

  • Event: connectionError

  • Event: keepaliveTimeout

  • Event: publish

  • Event: ack

  • Event: ping

  • Event: subscribe

  • Event: unsubscribe

  • Event: connackSent

  • Event: closed

  • aedes.handle (stream)

  • aedes.subscribe (topic, deliverfunc, callback)

  • aedes.unsubscribe (topic, deliverfunc, callback)

  • aedes.publish (packet, callback)

  • aedes.close ([callback])

  • Handler: decodeProtocol (client, buffer)

  • Handler: preConnect (client, callback)

  • Handler: authenticate (client, username, password, callback)

  • Handler: authorizePublish (client, packet, callback)

  • Handler: authorizeSubscribe (client, subscription, callback)

  • Handler: authorizeForward (client, packet)

  • Handler: published (packet, client, callback)

每种的API的具体使用方法,查阅上面参考文档的github链接.

3.示例

3.1 Simple plain MQTT server

const aedes = require('aedes')()
const server = require('net').createServer(aedes.handle)
const port = 1883

server.listen(port, function () {
  console.log('server started and listening on port ', port)
})

3.2MQTT over TLS / MQTTS

const fs = require('fs')
const aedes = require('aedes')()
const port = 8883

const options = {
  key: fs.readFileSync('YOUR_PRIVATE_KEY_FILE.pem'),
  cert: fs.readFileSync('YOUR_PUBLIC_CERT_FILE.pem')
}

const server = require('tls').createServer(options, aedes.handle)

server.listen(port, function () {
  console.log('server started and listening on port ', port)
})

3.3MQTT server over WebSocket

const aedes = require('./aedes')()
const httpServer = require('http').createServer()
const ws = require('websocket-stream')
const port = 8888

ws.createServer({ server: httpServer }, aedes.handle)

httpServer.listen(port, function () {
  console.log('websocket server listening on port ', port)
})

3.4Clusters

In order to use Aedes in clusters you have to choose a persistence and an mqemitter that supports clusters. Tested persistence/mqemitters that works with clusters are:

使用mqemitter-mongodbaedes-persistence-mongodb构建集群的示例:

const cluster = require('cluster')
const mqemitter = require('mqemitter-mongodb')
const mongoPersistence = require('aedes-persistence-mongodb')

const MONGO_URL = 'mongodb://127.0.0.1/aedes-clusters'

function startAedes () {
  const port = 1883

  const aedes = require('aedes')({
    id: 'BROKER_' + cluster.worker.id,
    mq: mqemitter({
      url: MONGO_URL
    }),
    persistence: mongoPersistence({
      url: MONGO_URL,
      // Optional ttl settings
      ttl: {
        packets: 300, // Number of seconds
        subscriptions: 300
      }
    })
  })

  const server = require('net').createServer(aedes.handle)

  server.listen(port, function () {
    console.log('Aedes listening on port:', port)
    aedes.publish({ topic: 'aedes/hello', payload: "I'm broker " + aedes.id })
  })

  aedes.on('subscribe', function (subscriptions, client) {
    console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
            '\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', aedes.id)
  })

  aedes.on('unsubscribe', function (subscriptions, client) {
    console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
            '\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', aedes.id)
  })

  // fired when a client connects
  aedes.on('client', function (client) {
    console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
  })

  // fired when a client disconnects
  aedes.on('clientDisconnect', function (client) {
    console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
  })

  // fired when a message is published
  aedes.on('publish', async function (packet, client) {
    console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedes.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedes.id)
  })
}

if (cluster.isMaster) {
  const numWorkers = require('os').cpus().length
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork()
  }

  cluster.on('online', function (worker) {
    console.log('Worker ' + worker.process.pid + ' is online')
  })

  cluster.on('exit', function (worker, code, signal) {
    console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal)
    console.log('Starting a new worker')
    cluster.fork()
  })
} else {
  startAedes()
}
学习更多知识,加QQ群:1098090823
威武网 » 物联网工具Aedes教程1:基本介绍

提供最优质的资源集合

立即查看 了解详情