/** * Created by aiko on 25/10/15. */ var mqtt = require('sc-mqtt'); var MqttConnectOptions = org.eclipse.paho.client.mqttv3.MqttConnectOptions; // create a new client var client = mqtt.client('tcp://pi2.aiko.sh:1883', 'scriptCraft_aiko_pro'); // connect to the broker //client.connect( { keepAliveInterval: 15 } ); var options = new MqttConnectOptions(); options.setKeepAliveInterval(15); client.connect(options); var uptime = 'raspberrypiko/status/uptime'; var cpu_temp = 'raspberrypiko/sensors/temp/cpu'; client.subscribe(cpu_temp); client.subscribe(uptime); // do something when an incoming message arrives... var Rx = require('node_modules/rx'); var all_messages = Rx.Observable.create(function (observer) { // override default onMessageArrive method with our own. client.onMessageArrived(function (topic, message) { var item = { topic: topic, message: JSON.parse(message) }; console.log('Rx Message arrived: topic=' + topic + ', message=' + message); observer.onNext(item); }); client.onConnectionLost(function (err) { observer.onCompleted(); }); // Any cleanup logic might go here return function () { console.log('disposed'); } }); var degrees = all_messages.filter(function (n) { return n.topic === cpu_temp; }).distinctUntilChanged(function (n) { return n.message.degrees; }); var subscription = degrees.subscribe( function (x) { console.log('onNext: %s, %s', x.topic, x.message.degrees); }, function (e) { console.log('onError: %s', e); }, function () { console.log('onCompleted'); } ); //subscription.dispose(); console.log('restarted');