packages/skygear-core/lib/pubsub.js
/**
* Copyright 2015 Oursky Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const _ = require('lodash');
const _ws = require('websocket');
let WebSocket = null;
if (_ws) {
WebSocket = _ws.w3cwebsocket;
} else {
WebSocket = window.WebSocket; //eslint-disable-line
}
const url = require('url');
const ee = require('event-emitter');
import {EventHandle} from './util';
const ON_OPEN = 'onOpen';
const ON_CLOSE = 'onClose';
/**
* The Pubsub client
*/
export class Pubsub {
/**
* Constructs a new Pubsub object.
*
* @param {container} container - the Skygear container
* @param {Boolean} internal - true if it is an internal pubsub client
* @return {Pubsub} pubsub client
*/
constructor(container, internal = false) {
this._container = container;
this._ws = null;
this._internal = internal;
this._queue = [];
this._ee = ee({});
this._handlers = {};
this._reconnectWait = 5000;
this._retryCount = 0;
}
/**
* Registers a connection open listener
*
* @param {function()} listener - the listener
* @return {EventHandler} event handler
*/
onOpen(listener) {
this._ee.on(ON_OPEN, listener);
return new EventHandle(this._ee, ON_OPEN, listener);
}
/**
* Registers a connection close listener
*
* @param {function()} listener - the listener
* @return {EventHandler} event handler
*/
onClose(listener) {
this._ee.on(ON_CLOSE, listener);
return new EventHandle(this._ee, ON_CLOSE, listener);
}
_pubsubUrl(internal = false) {
let parsedUrl = url.parse(this._container.endPoint);
let protocol = parsedUrl.protocol === 'https:' ? 'wss:' : 'ws:';
let trailingSlash = parsedUrl.path.endsWith('/');
let pubsubPath = internal ? '/_/pubsub' : '/pubsub';
pubsubPath = trailingSlash ? pubsubPath.substring(1) : pubsubPath;
let path = parsedUrl.path + pubsubPath;
var queryString = '?api_key=' + this._container.apiKey;
return protocol + '//' + parsedUrl.host + path + queryString;
}
_hasCredentials() {
return !!this._container.apiKey;
}
/**
* Connects to server if the Skygear container has credential, otherwise
* close the connection.
*/
reconfigure() {
if (!this._hasCredentials()) {
this.close();
return;
}
this.connect();
}
_onopen() {
// Trigger registed onOpen callback
this._ee.emit(ON_OPEN, true);
// Resubscribe previously subscribed channels
_.forEach(this._handlers, (handlers, channel) => {
this._sendSubscription(channel);
});
// Flushed queued messages to the server
_.forEach(this._queue, (data) => {
this._ws.send(JSON.stringify(data));
});
this._queue = [];
}
_onmessage(data) {
_.forEach(this._handlers[data.channel], (handler) => {
handler(data.data);
});
}
/**
* Subscribes a function callback on receiving message at the specified
* channel.
*
* @param {string} channel - name of the channel to subscribe
* @param {function(object:*)} callback - function to be trigger with
* incoming data
* @return {function(object:*)} The callback function
**/
on(channel, callback) {
return this.subscribe(channel, callback);
}
/**
* Subscribes the channel for just one message.
*
* This function takes one message off from a pubsub channel,
* returning a promise of that message. When a message
* is received from the channel, the channel will be unsubscribed.
*
* @param {string} channel - name of the channel
* @return {Promise<Object>} promise of next message in this channel
*/
once(channel) {
return new Promise((resolve) => {
const handler = (data) => {
this.unsubscribe(channel, handler);
resolve(data);
};
this.subscribe(channel, handler);
});
}
/**
* Publishes message to a channel.
*
* @param {String} channel - name of the channel
* @param {Object} data - data to be published
*/
publish(channel, data) {
if (!channel) {
throw new Error('Missing channel to publish');
}
const dataType = typeof data;
if (dataType !== 'object' || data === null || _.isArray(data)) {
throw new Error('Data must be object');
}
let publishData = {
action: 'pub',
channel,
data
};
if (this.connected) {
this._ws.send(JSON.stringify(publishData));
} else {
this._queue.push(publishData);
}
}
_sendSubscription(channel) {
if (this.connected) {
let data = {
action: 'sub',
channel: channel
};
this._ws.send(JSON.stringify(data));
}
}
_sendRemoveSubscription(channel) {
if (this.connected) {
let data = {
action: 'unsub',
channel: channel
};
this._ws.send(JSON.stringify(data));
}
}
/**
* Unsubscribes a function callback on the specified channel.
*
* If pass in `callback` is null, all callbacks in the specified channel
* will be removed.
*
* @param {string} channel - name of the channel to unsubscribe
* @param {function(object:*)=} callback - function to be trigger with
* incoming data
**/
off(channel, callback = null) {
this.unsubscribe(channel, callback);
}
/**
* Subscribes a function callback on receiving message at the specified
* channel.
*
* @param {string} channel - name of the channel to subscribe
* @param {function(object:*)} handler - function to be trigger with
* incoming data
* @return {function(object:*)} The callback function
**/
subscribe(channel, handler) {
if (!channel) {
throw new Error('Missing channel to subscribe');
}
let alreadyExists = this.hasHandlers(channel);
this._register(channel, handler);
if (!alreadyExists) {
this._sendSubscription(channel);
}
return handler;
}
/**
* Unsubscribes a function callback on the specified channel.
*
* If pass in `callback` is null, all callbacks in the specified channel
* will be removed.
*
* @param {string} channel - name of the channel to unsubscribe
* @param {function(object:*)=} [handler] - function to be trigger with
* incoming data
**/
unsubscribe(channel, handler = null) {
if (!channel) {
throw new Error('Missing channel to unsubscribe');
}
if (!this.hasHandlers(channel)) {
return;
}
var handlersToRemove;
if (handler) {
handlersToRemove = [handler];
} else {
handlersToRemove = this._handlers[channel];
}
_.forEach(handlersToRemove, (handlerToRemove) => {
this._unregister(channel, handlerToRemove);
});
if (!this.hasHandlers(channel)) {
this._sendRemoveSubscription(channel);
}
}
/**
* Checks if the channel is subscribed with any handler.
*
* @param {String} channel - name of the channel
* @return {Boolean} true if the channel has handlers
*/
hasHandlers(channel) {
let handlers = this._handlers[channel];
return handlers ? handlers.length > 0 : false;
}
_register(channel, handler) {
if (!this._handlers[channel]) {
this._handlers[channel] = [];
}
this._handlers[channel].push(handler);
}
_unregister(channel, handler) {
let handlers = this._handlers[channel];
handlers = _.reject(handlers, function (item) {
return item === handler;
});
if (handlers.length > 0) {
this._handlers[channel] = handlers;
} else {
delete this._handlers[channel];
}
}
_reconnect() {
let interval = _.min([this._reconnectWait * this._retryCount, 60000]);
_.delay(() => {
this._retryCount += 1;
this.connect();
}, interval);
}
/**
* True if it is connected to the server.
*
* @type {Boolean}
*/
get connected() {
return this._ws && this._ws.readyState === 1;
}
/**
* Closes connection and clear all handlers.
*/
reset() {
this.close();
this._handlers = {};
}
/**
* Closes connection.
*/
close() {
if (this._ws) {
this._clearWebSocket();
this._ws.close();
this._ee.emit(ON_CLOSE, false);
this._ws = null;
}
}
/**
* @type {WebSocket}
*/
get WebSocket() {
return WebSocket;
}
_setWebSocket(ws) {
const emitter = this._ee;
this._ws = ws;
if (!this._ws) {
return;
}
this._ws.onopen = () => {
this._retryCount = 0;
this._onopen();
};
this._ws.onclose = () => {
emitter.emit(ON_CLOSE, false);
this._reconnect();
};
this._ws.onmessage = (evt) => {
var message;
try {
message = JSON.parse(evt.data);
} catch (e) {
console.log('Got malformed websocket data:', evt.data);
return;
}
this._onmessage(message);
};
}
_clearWebSocket() {
if (!this._ws) {
return;
}
this._ws.onopen = null;
this._ws.onclose = null;
this._ws.onmessage = null;
}
/**
* Connects to server if the Skygear container has credentials and not
* connected.
*/
connect() {
if (!this._hasCredentials() || this.connected) {
return;
}
let pubsubUrl = this._pubsubUrl(this._internal);
// The old websocket will still call our _onopen and we will try to send
// message with the new websocket, whose readyState may not be OPEN.
// Therefore, we need to clear the websocket
this._clearWebSocket();
let ws = new this.WebSocket(pubsubUrl);
this._setWebSocket(ws);
}
}
/**
* Pubsub container
*
* A publish-subscribe interface, providing real-time message-based
* communication with other users.
*/
export class PubsubContainer {
/**
* @param {Container} container - the Skygear container
* @return {PubsubContainer}
*/
constructor(container) {
/**
* @private
*/
this.container = container;
this._pubsub = new Pubsub(this.container, false);
this._internalPubsub = new Pubsub(this.container, true);
/**
* Indicating if the pubsub client should connect to server automatically.
*
* @type {Boolean}
*/
this.autoPubsub = true;
}
/**
* Subscribes a function callback on receiving message at the specified
* channel.
*
* @param {string} channel - name of the channel to subscribe
* @param {function(object:*)} callback - function to be trigger with
* incoming data
* @return {function(object:*)} The callback function
**/
on(channel, callback) {
return this._pubsub.on(channel, callback);
}
/**
* Unsubscribes a function callback on the specified channel.
*
* If pass in `callback` is null, all callbacks in the specified channel
* will be removed.
*
* @param {string} channel - name of the channel to unsubscribe
* @param {function(object:*)=} callback - function to be trigger with
* incoming data
**/
off(channel, callback = null) {
this._pubsub.off(channel, callback);
}
/**
* Subscribes the channel for just one message.
*
* This function takes one message off from a pubsub channel,
* returning a promise of that message. When a message
* is received from the channel, the channel will be unsubscribed.
*
* @param {string} channel - name of the channel
* @return {Promise<Object>} promise of next message in this channel
*/
once(channel) {
return this._pubsub.once(channel);
}
/**
* Registers listener on connection between pubsub client and server is open.
*
* @param {function()} listener - function to be triggered when connection
* open
*/
onOpen(listener) {
this._pubsub.onOpen(listener);
}
/**
* Registers listener on connection between pubsub client and server is
* closed.
*
* @param {function()} listener - function to be triggered when connection
* closed
*/
onClose(listener) {
this._pubsub.onClose(listener);
}
/**
* Publishes message to a channel.
*
* @param {String} channel - name of the channel
* @param {Object} data - data to be published
*/
publish(channel, data) {
this._pubsub.publish(channel, data);
}
/**
* Checks if the channel is subscribed with any handler.
*
* @param {String} channel - name of the channel
* @return {Boolean} true if the channel has handlers
*/
hasHandlers(channel) {
this._pubsub.hasHandlers(channel);
}
/**
* @private
*/
get deviceID() {
return this.container.push.deviceID;
}
_reconfigurePubsubIfNeeded() {
if (!this.autoPubsub) {
return;
}
this.reconfigure();
}
/**
* Connects to server if the Skygear container has credential, otherwise
* close the connection.
*/
reconfigure() {
this._internalPubsub.reset();
if (this.deviceID !== null) {
this._internalPubsub.subscribe('_sub_' + this.deviceID, function (data) {
console.log('Receivied data for subscription: ' + data);
});
}
this._internalPubsub.reconfigure();
this._pubsub.reconfigure();
}
}