commit 702dce0b11345e0938476bc7569b18c11f983ef7 Author: Aiko Mastboom Date: Tue Oct 27 20:38:13 2015 +0100 initial Rx + mqtt test diff --git a/scriptcraft/plugins/mqtt.js b/scriptcraft/plugins/mqtt.js new file mode 100644 index 0000000..ab7697e --- /dev/null +++ b/scriptcraft/plugins/mqtt.js @@ -0,0 +1,64 @@ +/** + * Created by aiko on 25/10/15. + */ + +var mqtt = require('sc-mqtt'); + +var MqttConnectOptions = org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +// create a new client +var client = mqtt.client('tcp://pi2.aiko.sh:1883', 'scriptCraft_aiko_pro'); +// connect to the broker +//client.connect( { keepAliveInterval: 15 } ); +var options = new MqttConnectOptions(); +options.setKeepAliveInterval(15); +client.connect(options); + +var uptime = 'raspberrypiko/status/uptime'; +var cpu_temp = 'raspberrypiko/sensors/temp/cpu'; +client.subscribe(cpu_temp); +client.subscribe(uptime); + +// do something when an incoming message arrives... +var Rx = require('node_modules/rx'); + +var all_messages = Rx.Observable.create(function (observer) { +// override default onMessageArrive method with our own. + client.onMessageArrived(function (topic, message) { + var item = { + topic: topic, + message: JSON.parse(message) + }; + console.log('Rx Message arrived: topic=' + topic + ', message=' + message); + observer.onNext(item); + }); + client.onConnectionLost(function (err) { + observer.onCompleted(); + }); + + // Any cleanup logic might go here + return function () { + console.log('disposed'); + } +}); + +var degrees = all_messages.filter(function (n) { + return n.topic === cpu_temp; +}).distinctUntilChanged(function (n) { + return n.message.degrees; +}); + +var subscription = degrees.subscribe( + function (x) { + console.log('onNext: %s, %s', x.topic, x.message.degrees); + }, + function (e) { + console.log('onError: %s', e); + }, + function () { + console.log('onCompleted'); + } +); + +//subscription.dispose(); +console.log('restarted');