物联网工具Aedes教程1:基本介绍
1.引言
其中,mosca采用nodejs实现,但github上的mosca项目已经停止维护了;
mosca的原班人马重新打造了aedes
, 支持数据持久化和集群,性能更优越;
Node-Red中配置节点:node-red-contrib-aedes
,可以快速启动一个物联网消息中间件,体验一下aedes;
威武网会陆续推出关于Aedes的相关资料,大家一起学习!
2.API介绍
参考文档:
-
new Aedes([options]) / new Aedes.Server([options])
-
aedes.id
-
aedes.connectedClients
-
aedes.closed
-
Event: client
-
Event: clientReady
-
Event: clientDisconnect
-
Event: clientError
-
Event: connectionError
-
Event: keepaliveTimeout
-
Event: publish
-
Event: ack
-
Event: ping
-
Event: subscribe
-
Event: unsubscribe
-
Event: connackSent
-
Event: closed
-
aedes.handle (stream)
-
aedes.subscribe (topic, deliverfunc, callback)
-
aedes.unsubscribe (topic, deliverfunc, callback)
-
aedes.publish (packet, callback)
-
aedes.close ([callback])
-
Handler: decodeProtocol (client, buffer)
-
Handler: preConnect (client, callback)
-
Handler: authenticate (client, username, password, callback)
-
Handler: authorizePublish (client, packet, callback)
-
Handler: authorizeSubscribe (client, subscription, callback)
-
Handler: authorizeForward (client, packet)
-
Handler: published (packet, client, callback)
每种的API的具体使用方法,查阅上面参考文档的github链接.
3.示例
3.1 Simple plain MQTT server
const aedes = require('aedes')() const server = require('net').createServer(aedes.handle) const port = 1883 server.listen(port, function () { console.log('server started and listening on port ', port) })
3.2MQTT over TLS / MQTTS
const fs = require('fs') const aedes = require('aedes')() const port = 8883 const options = { key: fs.readFileSync('YOUR_PRIVATE_KEY_FILE.pem'), cert: fs.readFileSync('YOUR_PUBLIC_CERT_FILE.pem') } const server = require('tls').createServer(options, aedes.handle) server.listen(port, function () { console.log('server started and listening on port ', port) })
3.3MQTT server over WebSocket
const aedes = require('./aedes')() const httpServer = require('http').createServer() const ws = require('websocket-stream') const port = 8888 ws.createServer({ server: httpServer }, aedes.handle) httpServer.listen(port, function () { console.log('websocket server listening on port ', port) })
3.4Clusters
In order to use Aedes in clusters you have to choose a persistence and an mqemitter that supports clusters. Tested persistence/mqemitters that works with clusters are:
使用mqemitter-mongodb
和aedes-persistence-mongodb
构建集群的示例:
const cluster = require('cluster') const mqemitter = require('mqemitter-mongodb') const mongoPersistence = require('aedes-persistence-mongodb') const MONGO_URL = 'mongodb://127.0.0.1/aedes-clusters' function startAedes () { const port = 1883 const aedes = require('aedes')({ id: 'BROKER_' + cluster.worker.id, mq: mqemitter({ url: MONGO_URL }), persistence: mongoPersistence({ url: MONGO_URL, // Optional ttl settings ttl: { packets: 300, // Number of seconds subscriptions: 300 } }) }) const server = require('net').createServer(aedes.handle) server.listen(port, function () { console.log('Aedes listening on port:', port) aedes.publish({ topic: 'aedes/hello', payload: "I'm broker " + aedes.id }) }) aedes.on('subscribe', function (subscriptions, client) { console.log('MQTT client \x1b[32m' + (client ? client.id : client) + '\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', aedes.id) }) aedes.on('unsubscribe', function (subscriptions, client) { console.log('MQTT client \x1b[32m' + (client ? client.id : client) + '\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', aedes.id) }) // fired when a client connects aedes.on('client', function (client) { console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id) }) // fired when a client disconnects aedes.on('clientDisconnect', function (client) { console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id) }) // fired when a message is published aedes.on('publish', async function (packet, client) { console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedes.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedes.id) }) } if (cluster.isMaster) { const numWorkers = require('os').cpus().length for (let i = 0; i < numWorkers; i++) { cluster.fork() } cluster.on('online', function (worker) { console.log('Worker ' + worker.process.pid + ' is online') }) cluster.on('exit', function (worker, code, signal) { console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal) console.log('Starting a new worker') cluster.fork() }) } else { startAedes() }