From be472962f016ce1836f54f741f494a38a4b21842 Mon Sep 17 00:00:00 2001
From: NorthLan <6995syu@163.com>
Date: Tue, 19 Apr 2022 16:53:04 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BD=BF=E7=94=A8?=
=?UTF-8?q?=E7=A4=BA=E4=BE=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
examples/cluster/.gitkeep | 0
examples/singleon/chat/README.md | 15 +
examples/singleon/chat/main.go | 161 ++++++
examples/singleon/chat/web/index.html | 69 +++
examples/singleon/chat/web/protocol.js | 349 +++++++++++
examples/singleon/chat/web/starx-wsclient.js | 573 +++++++++++++++++++
6 files changed, 1167 insertions(+)
create mode 100644 examples/cluster/.gitkeep
create mode 100644 examples/singleon/chat/README.md
create mode 100644 examples/singleon/chat/main.go
create mode 100644 examples/singleon/chat/web/index.html
create mode 100644 examples/singleon/chat/web/protocol.js
create mode 100644 examples/singleon/chat/web/starx-wsclient.js
diff --git a/examples/cluster/.gitkeep b/examples/cluster/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/examples/singleon/chat/README.md b/examples/singleon/chat/README.md
new file mode 100644
index 0000000..785c15f
--- /dev/null
+++ b/examples/singleon/chat/README.md
@@ -0,0 +1,15 @@
+# starx-chat-demo
+chat room demo base on [ngs](https://git.noahlan.cn/noahlan/ngs.git) in 100 lines
+
+refs: https://git.noahlan.cn/noahlan/ngs
+
+## Required
+- golang
+- websocket
+
+## Run
+```
+go run main.go
+```
+
+open browser => http://localhost:3250/web/
\ No newline at end of file
diff --git a/examples/singleon/chat/main.go b/examples/singleon/chat/main.go
new file mode 100644
index 0000000..9ae2570
--- /dev/null
+++ b/examples/singleon/chat/main.go
@@ -0,0 +1,161 @@
+package main
+
+import (
+ "fmt"
+ "git.noahlan.cn/northlan/ngs"
+ "git.noahlan.cn/northlan/ngs/component"
+ "git.noahlan.cn/northlan/ngs/pipeline"
+ "git.noahlan.cn/northlan/ngs/scheduler"
+ "git.noahlan.cn/northlan/ngs/serialize/json"
+ "git.noahlan.cn/northlan/ngs/session"
+ "log"
+ "net/http"
+ "strings"
+ "time"
+)
+
+type (
+ Room struct {
+ group *ngs.Group
+ }
+
+ // RoomManager represents a component that contains a bundle of room
+ RoomManager struct {
+ component.Base
+ timer *scheduler.Timer
+ rooms map[int]*Room
+ }
+
+ // UserMessage represents a message that user sent
+ UserMessage struct {
+ Name string `json:"name"`
+ Content string `json:"content"`
+ }
+
+ // NewUser message will be received when new user join room
+ NewUser struct {
+ Content string `json:"content"`
+ }
+
+ // AllMembers contains all members uid
+ AllMembers struct {
+ Members []int64 `json:"members"`
+ }
+
+ // JoinResponse represents the result of joining room
+ JoinResponse struct {
+ Code int `json:"code"`
+ Result string `json:"result"`
+ }
+
+ stats struct {
+ component.Base
+ timer *scheduler.Timer
+ outboundBytes int
+ inboundBytes int
+ }
+)
+
+func (stats *stats) outbound(s *session.Session, msg *pipeline.Message) error {
+ stats.outboundBytes += len(msg.Data)
+ return nil
+}
+
+func (stats *stats) inbound(s *session.Session, msg *pipeline.Message) error {
+ stats.inboundBytes += len(msg.Data)
+ return nil
+}
+
+func (stats *stats) AfterInit() {
+ stats.timer = scheduler.NewTimer(time.Minute, func() {
+ println("OutboundBytes", stats.outboundBytes)
+ println("InboundBytes", stats.outboundBytes)
+ })
+}
+
+const (
+ testRoomID = 1
+ roomIDKey = "ROOM_ID"
+)
+
+func NewRoomManager() *RoomManager {
+ return &RoomManager{
+ rooms: map[int]*Room{},
+ }
+}
+
+// AfterInit component lifetime callback
+func (mgr *RoomManager) AfterInit() {
+ session.Lifetime.OnClosed(func(s *session.Session) {
+ if !s.HasKey(roomIDKey) {
+ return
+ }
+ room := s.Value(roomIDKey).(*Room)
+ room.group.Leave(s)
+ })
+ mgr.timer = scheduler.NewTimer(time.Minute, func() {
+ for roomId, room := range mgr.rooms {
+ println(fmt.Sprintf("UserCount: RoomID=%d, Time=%s, Count=%d",
+ roomId, time.Now().String(), room.group.Count()))
+ }
+ })
+}
+
+// Join room
+func (mgr *RoomManager) Join(s *session.Session, msg []byte) error {
+ // NOTE: join test room only in demo
+ room, found := mgr.rooms[testRoomID]
+ if !found {
+ room = &Room{
+ group: ngs.NewGroup(fmt.Sprintf("room-%d", testRoomID)),
+ }
+ mgr.rooms[testRoomID] = room
+ }
+
+ fakeUID := s.ID() //just use s.ID as uid !!!
+ s.Bind(fakeUID) // binding session uids.Set(roomIDKey, room)
+ s.Set(roomIDKey, room)
+ s.Push("onMembers", &AllMembers{Members: room.group.Members()})
+ // notify others
+ room.group.Broadcast("onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
+ // new user join group
+ room.group.Add(s) // add session to group
+ return s.Response(&JoinResponse{Result: "success"})
+}
+
+// Message sync last message to all members
+func (mgr *RoomManager) Message(s *session.Session, msg *UserMessage) error {
+ if !s.HasKey(roomIDKey) {
+ return fmt.Errorf("not join room yet")
+ }
+ room := s.Value(roomIDKey).(*Room)
+ return room.group.Broadcast("onMessage", msg)
+}
+
+func main() {
+ components := &component.Components{}
+ components.Register(
+ NewRoomManager(),
+ component.WithName("room"), // rewrite component and handler name
+ component.WithNameFunc(strings.ToLower),
+ )
+
+ // traffic stats
+ pip := pipeline.New()
+ var stats = &stats{}
+ pip.Outbound().PushBack(stats.outbound)
+ pip.Inbound().PushBack(stats.inbound)
+
+ log.SetFlags(log.LstdFlags | log.Llongfile)
+ http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir("web"))))
+
+ ngs.Listen(":3250",
+ ngs.WithIsWebsocket(true),
+ ngs.WithPipeline(pip),
+ ngs.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
+ ngs.WithWSPath("/nano"),
+ ngs.WithDebugMode(),
+ ngs.WithSerializer(json.NewSerializer()), // override default serializer
+ ngs.WithComponents(components),
+ )
+}
diff --git a/examples/singleon/chat/web/index.html b/examples/singleon/chat/web/index.html
new file mode 100644
index 0000000..36eb6e5
--- /dev/null
+++ b/examples/singleon/chat/web/index.html
@@ -0,0 +1,69 @@
+
+
+
+
+ Chat Demo
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/singleon/chat/web/protocol.js b/examples/singleon/chat/web/protocol.js
new file mode 100644
index 0000000..b16420d
--- /dev/null
+++ b/examples/singleon/chat/web/protocol.js
@@ -0,0 +1,349 @@
+(function (exports, ByteArray, global) {
+ var Protocol = exports;
+
+ var PKG_HEAD_BYTES = 4;
+ var MSG_FLAG_BYTES = 1;
+ var MSG_ROUTE_CODE_BYTES = 2;
+ var MSG_ID_MAX_BYTES = 5;
+ var MSG_ROUTE_LEN_BYTES = 1;
+
+ var MSG_ROUTE_CODE_MAX = 0xffff;
+
+ var MSG_COMPRESS_ROUTE_MASK = 0x1;
+ var MSG_TYPE_MASK = 0x7;
+
+ var Package = Protocol.Package = {};
+ var Message = Protocol.Message = {};
+
+ Package.TYPE_HANDSHAKE = 1;
+ Package.TYPE_HANDSHAKE_ACK = 2;
+ Package.TYPE_HEARTBEAT = 3;
+ Package.TYPE_DATA = 4;
+ Package.TYPE_KICK = 5;
+
+ Message.TYPE_REQUEST = 0;
+ Message.TYPE_NOTIFY = 1;
+ Message.TYPE_RESPONSE = 2;
+ Message.TYPE_PUSH = 3;
+
+ /**
+ * pomele client encode
+ * id message id;
+ * route message route
+ * msg message body
+ * socketio current support string
+ */
+ Protocol.strencode = function(str) {
+ var byteArray = new ByteArray(str.length * 3);
+ var offset = 0;
+ for(var i = 0; i < str.length; i++){
+ var charCode = str.charCodeAt(i);
+ var codes = null;
+ if(charCode <= 0x7f){
+ codes = [charCode];
+ }else if(charCode <= 0x7ff){
+ codes = [0xc0|(charCode>>6), 0x80|(charCode & 0x3f)];
+ }else{
+ codes = [0xe0|(charCode>>12), 0x80|((charCode & 0xfc0)>>6), 0x80|(charCode & 0x3f)];
+ }
+ for(var j = 0; j < codes.length; j++){
+ byteArray[offset] = codes[j];
+ ++offset;
+ }
+ }
+ var _buffer = new ByteArray(offset);
+ copyArray(_buffer, 0, byteArray, 0, offset);
+ return _buffer;
+ };
+
+ /**
+ * client decode
+ * msg String data
+ * return Message Object
+ */
+ Protocol.strdecode = function(buffer) {
+ var bytes = new ByteArray(buffer);
+ var array = [];
+ var offset = 0;
+ var charCode = 0;
+ var end = bytes.length;
+ while(offset < end){
+ if(bytes[offset] < 128){
+ charCode = bytes[offset];
+ offset += 1;
+ }else if(bytes[offset] < 224){
+ charCode = ((bytes[offset] & 0x3f)<<6) + (bytes[offset+1] & 0x3f);
+ offset += 2;
+ }else{
+ charCode = ((bytes[offset] & 0x0f)<<12) + ((bytes[offset+1] & 0x3f)<<6) + (bytes[offset+2] & 0x3f);
+ offset += 3;
+ }
+ array.push(charCode);
+ }
+ return String.fromCharCode.apply(null, array);
+ };
+
+ /**
+ * Package protocol encode.
+ *
+ * Pomelo package format:
+ * +------+-------------+------------------+
+ * | type | body length | body |
+ * +------+-------------+------------------+
+ *
+ * Head: 4bytes
+ * 0: package type,
+ * 1 - handshake,
+ * 2 - handshake ack,
+ * 3 - heartbeat,
+ * 4 - data
+ * 5 - kick
+ * 1 - 3: big-endian body length
+ * Body: body length bytes
+ *
+ * @param {Number} type package type
+ * @param {ByteArray} body body content in bytes
+ * @return {ByteArray} new byte array that contains encode result
+ */
+ Package.encode = function(type, body){
+ var length = body ? body.length : 0;
+ var buffer = new ByteArray(PKG_HEAD_BYTES + length);
+ var index = 0;
+ buffer[index++] = type & 0xff;
+ buffer[index++] = (length >> 16) & 0xff;
+ buffer[index++] = (length >> 8) & 0xff;
+ buffer[index++] = length & 0xff;
+ if(body) {
+ copyArray(buffer, index, body, 0, length);
+ }
+ return buffer;
+ };
+
+ /**
+ * Package protocol decode.
+ * See encode for package format.
+ *
+ * @param {ByteArray} buffer byte array containing package content
+ * @return {Object} {type: package type, buffer: body byte array}
+ */
+ Package.decode = function(buffer){
+ var offset = 0;
+ var bytes = new ByteArray(buffer);
+ var length = 0;
+ var rs = [];
+ while(offset < bytes.length) {
+ var type = bytes[offset++];
+ length = ((bytes[offset++]) << 16 | (bytes[offset++]) << 8 | bytes[offset++]) >>> 0;
+ var body = length ? new ByteArray(length) : null;
+ copyArray(body, 0, bytes, offset, length);
+ offset += length;
+ rs.push({'type': type, 'body': body});
+ }
+ return rs.length === 1 ? rs[0]: rs;
+ };
+
+ /**
+ * Message protocol encode.
+ *
+ * @param {Number} id message id
+ * @param {Number} type message type
+ * @param {Number} compressRoute whether compress route
+ * @param {Number|String} route route code or route string
+ * @param {Buffer} msg message body bytes
+ * @return {Buffer} encode result
+ */
+ Message.encode = function(id, type, compressRoute, route, msg){
+ // caculate message max length
+ var idBytes = msgHasId(type) ? caculateMsgIdBytes(id) : 0;
+ var msgLen = MSG_FLAG_BYTES + idBytes;
+
+ if(msgHasRoute(type)) {
+ if(compressRoute) {
+ if(typeof route !== 'number'){
+ throw new Error('error flag for number route!');
+ }
+ msgLen += MSG_ROUTE_CODE_BYTES;
+ } else {
+ msgLen += MSG_ROUTE_LEN_BYTES;
+ if(route) {
+ route = Protocol.strencode(route);
+ if(route.length>255) {
+ throw new Error('route maxlength is overflow');
+ }
+ msgLen += route.length;
+ }
+ }
+ }
+
+ if(msg) {
+ msgLen += msg.length;
+ }
+
+ var buffer = new ByteArray(msgLen);
+ var offset = 0;
+
+ // add flag
+ offset = encodeMsgFlag(type, compressRoute, buffer, offset);
+
+ // add message id
+ if(msgHasId(type)) {
+ offset = encodeMsgId(id, buffer, offset);
+ }
+
+ // add route
+ if(msgHasRoute(type)) {
+ offset = encodeMsgRoute(compressRoute, route, buffer, offset);
+ }
+
+ // add body
+ if(msg) {
+ offset = encodeMsgBody(msg, buffer, offset);
+ }
+
+ return buffer;
+ };
+
+ /**
+ * Message protocol decode.
+ *
+ * @param {Buffer|Uint8Array} buffer message bytes
+ * @return {Object} message object
+ */
+ Message.decode = function(buffer) {
+ var bytes = new ByteArray(buffer);
+ var bytesLen = bytes.length || bytes.byteLength;
+ var offset = 0;
+ var id = 0;
+ var route = null;
+
+ // parse flag
+ var flag = bytes[offset++];
+ var compressRoute = flag & MSG_COMPRESS_ROUTE_MASK;
+ var type = (flag >> 1) & MSG_TYPE_MASK;
+
+ // parse id
+ if(msgHasId(type)) {
+ var m = parseInt(bytes[offset]);
+ var i = 0;
+ do{
+ var m = parseInt(bytes[offset]);
+ id = id + ((m & 0x7f) * Math.pow(2,(7*i)));
+ offset++;
+ i++;
+ }while(m >= 128);
+ }
+
+ // parse route
+ if(msgHasRoute(type)) {
+ if(compressRoute) {
+ route = (bytes[offset++]) << 8 | bytes[offset++];
+ } else {
+ var routeLen = bytes[offset++];
+ if(routeLen) {
+ route = new ByteArray(routeLen);
+ copyArray(route, 0, bytes, offset, routeLen);
+ route = Protocol.strdecode(route);
+ } else {
+ route = '';
+ }
+ offset += routeLen;
+ }
+ }
+
+ // parse body
+ var bodyLen = bytesLen - offset;
+ var body = new ByteArray(bodyLen);
+
+ copyArray(body, 0, bytes, offset, bodyLen);
+
+ return {'id': id, 'type': type, 'compressRoute': compressRoute,
+ 'route': route, 'body': body};
+ };
+
+ var copyArray = function(dest, doffset, src, soffset, length) {
+ if('function' === typeof src.copy) {
+ // Buffer
+ src.copy(dest, doffset, soffset, soffset + length);
+ } else {
+ // Uint8Array
+ for(var index=0; index>= 7;
+ } while(id > 0);
+ return len;
+ };
+
+ var encodeMsgFlag = function(type, compressRoute, buffer, offset) {
+ if(type !== Message.TYPE_REQUEST && type !== Message.TYPE_NOTIFY &&
+ type !== Message.TYPE_RESPONSE && type !== Message.TYPE_PUSH) {
+ throw new Error('unkonw message type: ' + type);
+ }
+
+ buffer[offset] = (type << 1) | (compressRoute ? 1 : 0);
+
+ return offset + MSG_FLAG_BYTES;
+ };
+
+ var encodeMsgId = function(id, buffer, offset) {
+ do{
+ var tmp = id % 128;
+ var next = Math.floor(id/128);
+
+ if(next !== 0){
+ tmp = tmp + 128;
+ }
+ buffer[offset++] = tmp;
+
+ id = next;
+ } while(id !== 0);
+
+ return offset;
+ };
+
+ var encodeMsgRoute = function(compressRoute, route, buffer, offset) {
+ if (compressRoute) {
+ if(route > MSG_ROUTE_CODE_MAX){
+ throw new Error('route number is overflow');
+ }
+
+ buffer[offset++] = (route >> 8) & 0xff;
+ buffer[offset++] = route & 0xff;
+ } else {
+ if(route) {
+ buffer[offset++] = route.length & 0xff;
+ copyArray(buffer, offset, route, 0, route.length);
+ offset += route.length;
+ } else {
+ buffer[offset++] = 0;
+ }
+ }
+
+ return offset;
+ };
+
+ var encodeMsgBody = function(msg, buffer, offset) {
+ copyArray(buffer, offset, msg, 0, msg.length);
+ return offset + msg.length;
+ };
+
+ if(typeof(window) != "undefined") {
+ window.Protocol = Protocol;
+ }
+})(typeof(window)=="undefined" ? module.exports : (this.Protocol = {}),typeof(window)=="undefined" ? Buffer : Uint8Array, this);
diff --git a/examples/singleon/chat/web/starx-wsclient.js b/examples/singleon/chat/web/starx-wsclient.js
new file mode 100644
index 0000000..7bf20ad
--- /dev/null
+++ b/examples/singleon/chat/web/starx-wsclient.js
@@ -0,0 +1,573 @@
+(function() {
+ function Emitter(obj) {
+ if (obj) return mixin(obj);
+ }
+ /**
+ * Mixin the emitter properties.
+ *
+ * @param {Object} obj
+ * @return {Object}
+ * @api private
+ */
+
+ function mixin(obj) {
+ for (var key in Emitter.prototype) {
+ obj[key] = Emitter.prototype[key];
+ }
+ return obj;
+ }
+
+ /**
+ * Listen on the given `event` with `fn`.
+ *
+ * @param {String} event
+ * @param {Function} fn
+ * @return {Emitter}
+ * @api public
+ */
+
+ Emitter.prototype.on =
+ Emitter.prototype.addEventListener = function(event, fn){
+ this._callbacks = this._callbacks || {};
+ (this._callbacks[event] = this._callbacks[event] || [])
+ .push(fn);
+ return this;
+ };
+
+ /**
+ * Adds an `event` listener that will be invoked a single
+ * time then automatically removed.
+ *
+ * @param {String} event
+ * @param {Function} fn
+ * @return {Emitter}
+ * @api public
+ */
+
+ Emitter.prototype.once = function(event, fn){
+ var self = this;
+ this._callbacks = this._callbacks || {};
+
+ function on() {
+ self.off(event, on);
+ fn.apply(this, arguments);
+ }
+
+ on.fn = fn;
+ this.on(event, on);
+ return this;
+ };
+
+ /**
+ * Remove the given callback for `event` or all
+ * registered callbacks.
+ *
+ * @param {String} event
+ * @param {Function} fn
+ * @return {Emitter}
+ * @api public
+ */
+
+ Emitter.prototype.off =
+ Emitter.prototype.removeListener =
+ Emitter.prototype.removeAllListeners =
+ Emitter.prototype.removeEventListener = function(event, fn){
+ this._callbacks = this._callbacks || {};
+
+ // all
+ if (0 == arguments.length) {
+ this._callbacks = {};
+ return this;
+ }
+
+ // specific event
+ var callbacks = this._callbacks[event];
+ if (!callbacks) return this;
+
+ // remove all handlers
+ if (1 == arguments.length) {
+ delete this._callbacks[event];
+ return this;
+ }
+
+ // remove specific handler
+ var cb;
+ for (var i = 0; i < callbacks.length; i++) {
+ cb = callbacks[i];
+ if (cb === fn || cb.fn === fn) {
+ callbacks.splice(i, 1);
+ break;
+ }
+ }
+ return this;
+ };
+
+ /**
+ * Emit `event` with the given args.
+ *
+ * @param {String} event
+ * @param {Mixed} ...
+ * @return {Emitter}
+ */
+
+ Emitter.prototype.emit = function(event){
+ this._callbacks = this._callbacks || {};
+ var args = [].slice.call(arguments, 1)
+ , callbacks = this._callbacks[event];
+
+ if (callbacks) {
+ callbacks = callbacks.slice(0);
+ for (var i = 0, len = callbacks.length; i < len; ++i) {
+ callbacks[i].apply(this, args);
+ }
+ }
+
+ return this;
+ };
+
+ /**
+ * Return array of callbacks for `event`.
+ *
+ * @param {String} event
+ * @return {Array}
+ * @api public
+ */
+
+ Emitter.prototype.listeners = function(event){
+ this._callbacks = this._callbacks || {};
+ return this._callbacks[event] || [];
+ };
+
+ /**
+ * Check if this emitter has `event` handlers.
+ *
+ * @param {String} event
+ * @return {Boolean}
+ * @api public
+ */
+
+ Emitter.prototype.hasListeners = function(event){
+ return !! this.listeners(event).length;
+ };
+ var JS_WS_CLIENT_TYPE = 'js-websocket';
+ var JS_WS_CLIENT_VERSION = '0.0.1';
+
+ var Protocol = window.Protocol;
+ var decodeIO_encoder = null;
+ var decodeIO_decoder = null;
+ var Package = Protocol.Package;
+ var Message = Protocol.Message;
+ var EventEmitter = Emitter;
+ var rsa = window.rsa;
+
+ if(typeof(window) != "undefined" && typeof(sys) != 'undefined' && sys.localStorage) {
+ window.localStorage = sys.localStorage;
+ }
+
+ var RES_OK = 200;
+ var RES_FAIL = 500;
+ var RES_OLD_CLIENT = 501;
+
+ if (typeof Object.create !== 'function') {
+ Object.create = function (o) {
+ function F() {}
+ F.prototype = o;
+ return new F();
+ };
+ }
+
+ var root = window;
+ var starx = Object.create(EventEmitter.prototype); // object extend from object
+ root.starx = starx;
+ var socket = null;
+ var reqId = 0;
+ var callbacks = {};
+ var handlers = {};
+ //Map from request id to route
+ var routeMap = {};
+ var dict = {}; // route string to code
+ var abbrs = {}; // code to route string
+
+ var heartbeatInterval = 0;
+ var heartbeatTimeout = 0;
+ var nextHeartbeatTimeout = 0;
+ var gapThreshold = 100; // heartbeat gap threashold
+ var heartbeatId = null;
+ var heartbeatTimeoutId = null;
+ var handshakeCallback = null;
+
+ var decode = null;
+ var encode = null;
+
+ var reconnect = false;
+ var reconncetTimer = null;
+ var reconnectUrl = null;
+ var reconnectAttempts = 0;
+ var reconnectionDelay = 5000;
+ var DEFAULT_MAX_RECONNECT_ATTEMPTS = 10;
+
+ var useCrypto;
+
+ var handshakeBuffer = {
+ 'sys': {
+ type: JS_WS_CLIENT_TYPE,
+ version: JS_WS_CLIENT_VERSION,
+ rsa: {}
+ },
+ 'user': {
+ }
+ };
+
+ var initCallback = null;
+
+ starx.init = function(params, cb) {
+ initCallback = cb;
+ var host = params.host;
+ var port = params.port;
+ var path = params.path;
+
+ encode = params.encode || defaultEncode;
+ decode = params.decode || defaultDecode;
+
+ var url = 'ws://' + host;
+ if(port) {
+ url += ':' + port;
+ }
+
+ if(path) {
+ url += path;
+ }
+
+ handshakeBuffer.user = params.user;
+ if(params.encrypt) {
+ useCrypto = true;
+ rsa.generate(1024, "10001");
+ var data = {
+ rsa_n: rsa.n.toString(16),
+ rsa_e: rsa.e
+ };
+ handshakeBuffer.sys.rsa = data;
+ }
+ handshakeCallback = params.handshakeCallback;
+ connect(params, url, cb);
+ };
+
+ var defaultDecode = starx.decode = function(data) {
+ var msg = Message.decode(data);
+
+ if(msg.id > 0){
+ msg.route = routeMap[msg.id];
+ delete routeMap[msg.id];
+ if(!msg.route){
+ return;
+ }
+ }
+
+ msg.body = deCompose(msg);
+ return msg;
+ };
+
+ var defaultEncode = starx.encode = function(reqId, route, msg) {
+ var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY;
+
+ if(decodeIO_encoder && decodeIO_encoder.lookup(route)) {
+ var Builder = decodeIO_encoder.build(route);
+ msg = new Builder(msg).encodeNB();
+ } else {
+ msg = Protocol.strencode(JSON.stringify(msg));
+ }
+
+ var compressRoute = 0;
+ if(dict && dict[route]) {
+ route = dict[route];
+ compressRoute = 1;
+ }
+
+ return Message.encode(reqId, type, compressRoute, route, msg);
+ };
+
+ var connect = function(params, url, cb) {
+ console.log('connect to ' + url);
+
+ var params = params || {};
+ var maxReconnectAttempts = params.maxReconnectAttempts || DEFAULT_MAX_RECONNECT_ATTEMPTS;
+ reconnectUrl = url;
+
+ var onopen = function(event) {
+ if(!!reconnect) {
+ starx.emit('reconnect');
+ }
+ reset();
+ var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(handshakeBuffer)));
+ send(obj);
+ };
+ var onmessage = function(event) {
+ processPackage(Package.decode(event.data), cb);
+ // new package arrived, update the heartbeat timeout
+ if(heartbeatTimeout) {
+ nextHeartbeatTimeout = Date.now() + heartbeatTimeout;
+ }
+ };
+ var onerror = function(event) {
+ starx.emit('io-error', event);
+ console.error('socket error: ', event);
+ };
+ var onclose = function(event) {
+ starx.emit('close',event);
+ starx.emit('disconnect', event);
+ console.log('socket close: ', event);
+ if(!!params.reconnect && reconnectAttempts < maxReconnectAttempts) {
+ reconnect = true;
+ reconnectAttempts++;
+ reconncetTimer = setTimeout(function() {
+ connect(params, reconnectUrl, cb);
+ }, reconnectionDelay);
+ reconnectionDelay *= 2;
+ }
+ };
+ socket = new WebSocket(url);
+ socket.binaryType = 'arraybuffer';
+ socket.onopen = onopen;
+ socket.onmessage = onmessage;
+ socket.onerror = onerror;
+ socket.onclose = onclose;
+ };
+
+ starx.disconnect = function() {
+ if(socket) {
+ if(socket.disconnect) socket.disconnect();
+ if(socket.close) socket.close();
+ console.log('disconnect');
+ socket = null;
+ }
+
+ if(heartbeatId) {
+ clearTimeout(heartbeatId);
+ heartbeatId = null;
+ }
+ if(heartbeatTimeoutId) {
+ clearTimeout(heartbeatTimeoutId);
+ heartbeatTimeoutId = null;
+ }
+ };
+
+ var reset = function() {
+ reconnect = false;
+ reconnectionDelay = 1000 * 5;
+ reconnectAttempts = 0;
+ clearTimeout(reconncetTimer);
+ };
+
+ starx.request = function(route, msg, cb) {
+ if(arguments.length === 2 && typeof msg === 'function') {
+ cb = msg;
+ msg = {};
+ } else {
+ msg = msg || {};
+ }
+ route = route || msg.route;
+ if(!route) {
+ return;
+ }
+
+ reqId++;
+ sendMessage(reqId, route, msg);
+
+ callbacks[reqId] = cb;
+ routeMap[reqId] = route;
+ };
+
+ starx.notify = function(route, msg) {
+ msg = msg || {};
+ sendMessage(0, route, msg);
+ };
+
+ var sendMessage = function(reqId, route, msg) {
+ if(useCrypto) {
+ msg = JSON.stringify(msg);
+ var sig = rsa.signString(msg, "sha256");
+ msg = JSON.parse(msg);
+ msg['__crypto__'] = sig;
+ }
+
+ if(encode) {
+ msg = encode(reqId, route, msg);
+ }
+
+ var packet = Package.encode(Package.TYPE_DATA, msg);
+ send(packet);
+ };
+
+ var send = function(packet) {
+ socket.send(packet.buffer);
+ };
+
+ var handler = {};
+
+ var heartbeat = function(data) {
+ if(!heartbeatInterval) {
+ // no heartbeat
+ return;
+ }
+
+ var obj = Package.encode(Package.TYPE_HEARTBEAT);
+ if(heartbeatTimeoutId) {
+ clearTimeout(heartbeatTimeoutId);
+ heartbeatTimeoutId = null;
+ }
+
+ if(heartbeatId) {
+ // already in a heartbeat interval
+ return;
+ }
+ heartbeatId = setTimeout(function() {
+ heartbeatId = null;
+ send(obj);
+
+ nextHeartbeatTimeout = Date.now() + heartbeatTimeout;
+ heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, heartbeatTimeout);
+ }, heartbeatInterval);
+ };
+
+ var heartbeatTimeoutCb = function() {
+ var gap = nextHeartbeatTimeout - Date.now();
+ if(gap > gapThreshold) {
+ heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, gap);
+ } else {
+ console.error('server heartbeat timeout');
+ starx.emit('heartbeat timeout');
+ starx.disconnect();
+ }
+ };
+
+ var handshake = function(data) {
+ data = JSON.parse(Protocol.strdecode(data));
+ if(data.code === RES_OLD_CLIENT) {
+ starx.emit('error', 'client version not fullfill');
+ return;
+ }
+
+ if(data.code !== RES_OK) {
+ starx.emit('error', 'handshake fail');
+ return;
+ }
+
+ handshakeInit(data);
+
+ var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK);
+ send(obj);
+ if(initCallback) {
+ initCallback(socket);
+ }
+ };
+
+ var onData = function(data) {
+ var msg = data;
+ if(decode) {
+ msg = decode(msg);
+ }
+ processMessage(starx, msg);
+ };
+
+ var onKick = function(data) {
+ data = JSON.parse(Protocol.strdecode(data));
+ starx.emit('onKick', data);
+ };
+
+ handlers[Package.TYPE_HANDSHAKE] = handshake;
+ handlers[Package.TYPE_HEARTBEAT] = heartbeat;
+ handlers[Package.TYPE_DATA] = onData;
+ handlers[Package.TYPE_KICK] = onKick;
+
+ var processPackage = function(msgs) {
+ if(Array.isArray(msgs)) {
+ for(var i=0; i