diff --git a/examples/cluster/.gitkeep b/examples/cluster/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/examples/singleon/chat/README.md b/examples/singleon/chat/README.md new file mode 100644 index 0000000..785c15f --- /dev/null +++ b/examples/singleon/chat/README.md @@ -0,0 +1,15 @@ +# starx-chat-demo +chat room demo base on [ngs](https://git.noahlan.cn/noahlan/ngs.git) in 100 lines + +refs: https://git.noahlan.cn/noahlan/ngs + +## Required +- golang +- websocket + +## Run +``` +go run main.go +``` + +open browser => http://localhost:3250/web/ \ No newline at end of file diff --git a/examples/singleon/chat/main.go b/examples/singleon/chat/main.go new file mode 100644 index 0000000..9ae2570 --- /dev/null +++ b/examples/singleon/chat/main.go @@ -0,0 +1,161 @@ +package main + +import ( + "fmt" + "git.noahlan.cn/northlan/ngs" + "git.noahlan.cn/northlan/ngs/component" + "git.noahlan.cn/northlan/ngs/pipeline" + "git.noahlan.cn/northlan/ngs/scheduler" + "git.noahlan.cn/northlan/ngs/serialize/json" + "git.noahlan.cn/northlan/ngs/session" + "log" + "net/http" + "strings" + "time" +) + +type ( + Room struct { + group *ngs.Group + } + + // RoomManager represents a component that contains a bundle of room + RoomManager struct { + component.Base + timer *scheduler.Timer + rooms map[int]*Room + } + + // UserMessage represents a message that user sent + UserMessage struct { + Name string `json:"name"` + Content string `json:"content"` + } + + // NewUser message will be received when new user join room + NewUser struct { + Content string `json:"content"` + } + + // AllMembers contains all members uid + AllMembers struct { + Members []int64 `json:"members"` + } + + // JoinResponse represents the result of joining room + JoinResponse struct { + Code int `json:"code"` + Result string `json:"result"` + } + + stats struct { + component.Base + timer *scheduler.Timer + outboundBytes int + inboundBytes int + } +) + +func (stats *stats) outbound(s *session.Session, msg *pipeline.Message) error { + stats.outboundBytes += len(msg.Data) + return nil +} + +func (stats *stats) inbound(s *session.Session, msg *pipeline.Message) error { + stats.inboundBytes += len(msg.Data) + return nil +} + +func (stats *stats) AfterInit() { + stats.timer = scheduler.NewTimer(time.Minute, func() { + println("OutboundBytes", stats.outboundBytes) + println("InboundBytes", stats.outboundBytes) + }) +} + +const ( + testRoomID = 1 + roomIDKey = "ROOM_ID" +) + +func NewRoomManager() *RoomManager { + return &RoomManager{ + rooms: map[int]*Room{}, + } +} + +// AfterInit component lifetime callback +func (mgr *RoomManager) AfterInit() { + session.Lifetime.OnClosed(func(s *session.Session) { + if !s.HasKey(roomIDKey) { + return + } + room := s.Value(roomIDKey).(*Room) + room.group.Leave(s) + }) + mgr.timer = scheduler.NewTimer(time.Minute, func() { + for roomId, room := range mgr.rooms { + println(fmt.Sprintf("UserCount: RoomID=%d, Time=%s, Count=%d", + roomId, time.Now().String(), room.group.Count())) + } + }) +} + +// Join room +func (mgr *RoomManager) Join(s *session.Session, msg []byte) error { + // NOTE: join test room only in demo + room, found := mgr.rooms[testRoomID] + if !found { + room = &Room{ + group: ngs.NewGroup(fmt.Sprintf("room-%d", testRoomID)), + } + mgr.rooms[testRoomID] = room + } + + fakeUID := s.ID() //just use s.ID as uid !!! + s.Bind(fakeUID) // binding session uids.Set(roomIDKey, room) + s.Set(roomIDKey, room) + s.Push("onMembers", &AllMembers{Members: room.group.Members()}) + // notify others + room.group.Broadcast("onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())}) + // new user join group + room.group.Add(s) // add session to group + return s.Response(&JoinResponse{Result: "success"}) +} + +// Message sync last message to all members +func (mgr *RoomManager) Message(s *session.Session, msg *UserMessage) error { + if !s.HasKey(roomIDKey) { + return fmt.Errorf("not join room yet") + } + room := s.Value(roomIDKey).(*Room) + return room.group.Broadcast("onMessage", msg) +} + +func main() { + components := &component.Components{} + components.Register( + NewRoomManager(), + component.WithName("room"), // rewrite component and handler name + component.WithNameFunc(strings.ToLower), + ) + + // traffic stats + pip := pipeline.New() + var stats = &stats{} + pip.Outbound().PushBack(stats.outbound) + pip.Inbound().PushBack(stats.inbound) + + log.SetFlags(log.LstdFlags | log.Llongfile) + http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir("web")))) + + ngs.Listen(":3250", + ngs.WithIsWebsocket(true), + ngs.WithPipeline(pip), + ngs.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), + ngs.WithWSPath("/nano"), + ngs.WithDebugMode(), + ngs.WithSerializer(json.NewSerializer()), // override default serializer + ngs.WithComponents(components), + ) +} diff --git a/examples/singleon/chat/web/index.html b/examples/singleon/chat/web/index.html new file mode 100644 index 0000000..36eb6e5 --- /dev/null +++ b/examples/singleon/chat/web/index.html @@ -0,0 +1,69 @@ + + + + + Chat Demo + + +
+ +
+ + + +
+
+ + + + + + + \ No newline at end of file diff --git a/examples/singleon/chat/web/protocol.js b/examples/singleon/chat/web/protocol.js new file mode 100644 index 0000000..b16420d --- /dev/null +++ b/examples/singleon/chat/web/protocol.js @@ -0,0 +1,349 @@ +(function (exports, ByteArray, global) { + var Protocol = exports; + + var PKG_HEAD_BYTES = 4; + var MSG_FLAG_BYTES = 1; + var MSG_ROUTE_CODE_BYTES = 2; + var MSG_ID_MAX_BYTES = 5; + var MSG_ROUTE_LEN_BYTES = 1; + + var MSG_ROUTE_CODE_MAX = 0xffff; + + var MSG_COMPRESS_ROUTE_MASK = 0x1; + var MSG_TYPE_MASK = 0x7; + + var Package = Protocol.Package = {}; + var Message = Protocol.Message = {}; + + Package.TYPE_HANDSHAKE = 1; + Package.TYPE_HANDSHAKE_ACK = 2; + Package.TYPE_HEARTBEAT = 3; + Package.TYPE_DATA = 4; + Package.TYPE_KICK = 5; + + Message.TYPE_REQUEST = 0; + Message.TYPE_NOTIFY = 1; + Message.TYPE_RESPONSE = 2; + Message.TYPE_PUSH = 3; + + /** + * pomele client encode + * id message id; + * route message route + * msg message body + * socketio current support string + */ + Protocol.strencode = function(str) { + var byteArray = new ByteArray(str.length * 3); + var offset = 0; + for(var i = 0; i < str.length; i++){ + var charCode = str.charCodeAt(i); + var codes = null; + if(charCode <= 0x7f){ + codes = [charCode]; + }else if(charCode <= 0x7ff){ + codes = [0xc0|(charCode>>6), 0x80|(charCode & 0x3f)]; + }else{ + codes = [0xe0|(charCode>>12), 0x80|((charCode & 0xfc0)>>6), 0x80|(charCode & 0x3f)]; + } + for(var j = 0; j < codes.length; j++){ + byteArray[offset] = codes[j]; + ++offset; + } + } + var _buffer = new ByteArray(offset); + copyArray(_buffer, 0, byteArray, 0, offset); + return _buffer; + }; + + /** + * client decode + * msg String data + * return Message Object + */ + Protocol.strdecode = function(buffer) { + var bytes = new ByteArray(buffer); + var array = []; + var offset = 0; + var charCode = 0; + var end = bytes.length; + while(offset < end){ + if(bytes[offset] < 128){ + charCode = bytes[offset]; + offset += 1; + }else if(bytes[offset] < 224){ + charCode = ((bytes[offset] & 0x3f)<<6) + (bytes[offset+1] & 0x3f); + offset += 2; + }else{ + charCode = ((bytes[offset] & 0x0f)<<12) + ((bytes[offset+1] & 0x3f)<<6) + (bytes[offset+2] & 0x3f); + offset += 3; + } + array.push(charCode); + } + return String.fromCharCode.apply(null, array); + }; + + /** + * Package protocol encode. + * + * Pomelo package format: + * +------+-------------+------------------+ + * | type | body length | body | + * +------+-------------+------------------+ + * + * Head: 4bytes + * 0: package type, + * 1 - handshake, + * 2 - handshake ack, + * 3 - heartbeat, + * 4 - data + * 5 - kick + * 1 - 3: big-endian body length + * Body: body length bytes + * + * @param {Number} type package type + * @param {ByteArray} body body content in bytes + * @return {ByteArray} new byte array that contains encode result + */ + Package.encode = function(type, body){ + var length = body ? body.length : 0; + var buffer = new ByteArray(PKG_HEAD_BYTES + length); + var index = 0; + buffer[index++] = type & 0xff; + buffer[index++] = (length >> 16) & 0xff; + buffer[index++] = (length >> 8) & 0xff; + buffer[index++] = length & 0xff; + if(body) { + copyArray(buffer, index, body, 0, length); + } + return buffer; + }; + + /** + * Package protocol decode. + * See encode for package format. + * + * @param {ByteArray} buffer byte array containing package content + * @return {Object} {type: package type, buffer: body byte array} + */ + Package.decode = function(buffer){ + var offset = 0; + var bytes = new ByteArray(buffer); + var length = 0; + var rs = []; + while(offset < bytes.length) { + var type = bytes[offset++]; + length = ((bytes[offset++]) << 16 | (bytes[offset++]) << 8 | bytes[offset++]) >>> 0; + var body = length ? new ByteArray(length) : null; + copyArray(body, 0, bytes, offset, length); + offset += length; + rs.push({'type': type, 'body': body}); + } + return rs.length === 1 ? rs[0]: rs; + }; + + /** + * Message protocol encode. + * + * @param {Number} id message id + * @param {Number} type message type + * @param {Number} compressRoute whether compress route + * @param {Number|String} route route code or route string + * @param {Buffer} msg message body bytes + * @return {Buffer} encode result + */ + Message.encode = function(id, type, compressRoute, route, msg){ + // caculate message max length + var idBytes = msgHasId(type) ? caculateMsgIdBytes(id) : 0; + var msgLen = MSG_FLAG_BYTES + idBytes; + + if(msgHasRoute(type)) { + if(compressRoute) { + if(typeof route !== 'number'){ + throw new Error('error flag for number route!'); + } + msgLen += MSG_ROUTE_CODE_BYTES; + } else { + msgLen += MSG_ROUTE_LEN_BYTES; + if(route) { + route = Protocol.strencode(route); + if(route.length>255) { + throw new Error('route maxlength is overflow'); + } + msgLen += route.length; + } + } + } + + if(msg) { + msgLen += msg.length; + } + + var buffer = new ByteArray(msgLen); + var offset = 0; + + // add flag + offset = encodeMsgFlag(type, compressRoute, buffer, offset); + + // add message id + if(msgHasId(type)) { + offset = encodeMsgId(id, buffer, offset); + } + + // add route + if(msgHasRoute(type)) { + offset = encodeMsgRoute(compressRoute, route, buffer, offset); + } + + // add body + if(msg) { + offset = encodeMsgBody(msg, buffer, offset); + } + + return buffer; + }; + + /** + * Message protocol decode. + * + * @param {Buffer|Uint8Array} buffer message bytes + * @return {Object} message object + */ + Message.decode = function(buffer) { + var bytes = new ByteArray(buffer); + var bytesLen = bytes.length || bytes.byteLength; + var offset = 0; + var id = 0; + var route = null; + + // parse flag + var flag = bytes[offset++]; + var compressRoute = flag & MSG_COMPRESS_ROUTE_MASK; + var type = (flag >> 1) & MSG_TYPE_MASK; + + // parse id + if(msgHasId(type)) { + var m = parseInt(bytes[offset]); + var i = 0; + do{ + var m = parseInt(bytes[offset]); + id = id + ((m & 0x7f) * Math.pow(2,(7*i))); + offset++; + i++; + }while(m >= 128); + } + + // parse route + if(msgHasRoute(type)) { + if(compressRoute) { + route = (bytes[offset++]) << 8 | bytes[offset++]; + } else { + var routeLen = bytes[offset++]; + if(routeLen) { + route = new ByteArray(routeLen); + copyArray(route, 0, bytes, offset, routeLen); + route = Protocol.strdecode(route); + } else { + route = ''; + } + offset += routeLen; + } + } + + // parse body + var bodyLen = bytesLen - offset; + var body = new ByteArray(bodyLen); + + copyArray(body, 0, bytes, offset, bodyLen); + + return {'id': id, 'type': type, 'compressRoute': compressRoute, + 'route': route, 'body': body}; + }; + + var copyArray = function(dest, doffset, src, soffset, length) { + if('function' === typeof src.copy) { + // Buffer + src.copy(dest, doffset, soffset, soffset + length); + } else { + // Uint8Array + for(var index=0; index>= 7; + } while(id > 0); + return len; + }; + + var encodeMsgFlag = function(type, compressRoute, buffer, offset) { + if(type !== Message.TYPE_REQUEST && type !== Message.TYPE_NOTIFY && + type !== Message.TYPE_RESPONSE && type !== Message.TYPE_PUSH) { + throw new Error('unkonw message type: ' + type); + } + + buffer[offset] = (type << 1) | (compressRoute ? 1 : 0); + + return offset + MSG_FLAG_BYTES; + }; + + var encodeMsgId = function(id, buffer, offset) { + do{ + var tmp = id % 128; + var next = Math.floor(id/128); + + if(next !== 0){ + tmp = tmp + 128; + } + buffer[offset++] = tmp; + + id = next; + } while(id !== 0); + + return offset; + }; + + var encodeMsgRoute = function(compressRoute, route, buffer, offset) { + if (compressRoute) { + if(route > MSG_ROUTE_CODE_MAX){ + throw new Error('route number is overflow'); + } + + buffer[offset++] = (route >> 8) & 0xff; + buffer[offset++] = route & 0xff; + } else { + if(route) { + buffer[offset++] = route.length & 0xff; + copyArray(buffer, offset, route, 0, route.length); + offset += route.length; + } else { + buffer[offset++] = 0; + } + } + + return offset; + }; + + var encodeMsgBody = function(msg, buffer, offset) { + copyArray(buffer, offset, msg, 0, msg.length); + return offset + msg.length; + }; + + if(typeof(window) != "undefined") { + window.Protocol = Protocol; + } +})(typeof(window)=="undefined" ? module.exports : (this.Protocol = {}),typeof(window)=="undefined" ? Buffer : Uint8Array, this); diff --git a/examples/singleon/chat/web/starx-wsclient.js b/examples/singleon/chat/web/starx-wsclient.js new file mode 100644 index 0000000..7bf20ad --- /dev/null +++ b/examples/singleon/chat/web/starx-wsclient.js @@ -0,0 +1,573 @@ +(function() { + function Emitter(obj) { + if (obj) return mixin(obj); + } + /** + * Mixin the emitter properties. + * + * @param {Object} obj + * @return {Object} + * @api private + */ + + function mixin(obj) { + for (var key in Emitter.prototype) { + obj[key] = Emitter.prototype[key]; + } + return obj; + } + + /** + * Listen on the given `event` with `fn`. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.on = + Emitter.prototype.addEventListener = function(event, fn){ + this._callbacks = this._callbacks || {}; + (this._callbacks[event] = this._callbacks[event] || []) + .push(fn); + return this; + }; + + /** + * Adds an `event` listener that will be invoked a single + * time then automatically removed. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.once = function(event, fn){ + var self = this; + this._callbacks = this._callbacks || {}; + + function on() { + self.off(event, on); + fn.apply(this, arguments); + } + + on.fn = fn; + this.on(event, on); + return this; + }; + + /** + * Remove the given callback for `event` or all + * registered callbacks. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.off = + Emitter.prototype.removeListener = + Emitter.prototype.removeAllListeners = + Emitter.prototype.removeEventListener = function(event, fn){ + this._callbacks = this._callbacks || {}; + + // all + if (0 == arguments.length) { + this._callbacks = {}; + return this; + } + + // specific event + var callbacks = this._callbacks[event]; + if (!callbacks) return this; + + // remove all handlers + if (1 == arguments.length) { + delete this._callbacks[event]; + return this; + } + + // remove specific handler + var cb; + for (var i = 0; i < callbacks.length; i++) { + cb = callbacks[i]; + if (cb === fn || cb.fn === fn) { + callbacks.splice(i, 1); + break; + } + } + return this; + }; + + /** + * Emit `event` with the given args. + * + * @param {String} event + * @param {Mixed} ... + * @return {Emitter} + */ + + Emitter.prototype.emit = function(event){ + this._callbacks = this._callbacks || {}; + var args = [].slice.call(arguments, 1) + , callbacks = this._callbacks[event]; + + if (callbacks) { + callbacks = callbacks.slice(0); + for (var i = 0, len = callbacks.length; i < len; ++i) { + callbacks[i].apply(this, args); + } + } + + return this; + }; + + /** + * Return array of callbacks for `event`. + * + * @param {String} event + * @return {Array} + * @api public + */ + + Emitter.prototype.listeners = function(event){ + this._callbacks = this._callbacks || {}; + return this._callbacks[event] || []; + }; + + /** + * Check if this emitter has `event` handlers. + * + * @param {String} event + * @return {Boolean} + * @api public + */ + + Emitter.prototype.hasListeners = function(event){ + return !! this.listeners(event).length; + }; + var JS_WS_CLIENT_TYPE = 'js-websocket'; + var JS_WS_CLIENT_VERSION = '0.0.1'; + + var Protocol = window.Protocol; + var decodeIO_encoder = null; + var decodeIO_decoder = null; + var Package = Protocol.Package; + var Message = Protocol.Message; + var EventEmitter = Emitter; + var rsa = window.rsa; + + if(typeof(window) != "undefined" && typeof(sys) != 'undefined' && sys.localStorage) { + window.localStorage = sys.localStorage; + } + + var RES_OK = 200; + var RES_FAIL = 500; + var RES_OLD_CLIENT = 501; + + if (typeof Object.create !== 'function') { + Object.create = function (o) { + function F() {} + F.prototype = o; + return new F(); + }; + } + + var root = window; + var starx = Object.create(EventEmitter.prototype); // object extend from object + root.starx = starx; + var socket = null; + var reqId = 0; + var callbacks = {}; + var handlers = {}; + //Map from request id to route + var routeMap = {}; + var dict = {}; // route string to code + var abbrs = {}; // code to route string + + var heartbeatInterval = 0; + var heartbeatTimeout = 0; + var nextHeartbeatTimeout = 0; + var gapThreshold = 100; // heartbeat gap threashold + var heartbeatId = null; + var heartbeatTimeoutId = null; + var handshakeCallback = null; + + var decode = null; + var encode = null; + + var reconnect = false; + var reconncetTimer = null; + var reconnectUrl = null; + var reconnectAttempts = 0; + var reconnectionDelay = 5000; + var DEFAULT_MAX_RECONNECT_ATTEMPTS = 10; + + var useCrypto; + + var handshakeBuffer = { + 'sys': { + type: JS_WS_CLIENT_TYPE, + version: JS_WS_CLIENT_VERSION, + rsa: {} + }, + 'user': { + } + }; + + var initCallback = null; + + starx.init = function(params, cb) { + initCallback = cb; + var host = params.host; + var port = params.port; + var path = params.path; + + encode = params.encode || defaultEncode; + decode = params.decode || defaultDecode; + + var url = 'ws://' + host; + if(port) { + url += ':' + port; + } + + if(path) { + url += path; + } + + handshakeBuffer.user = params.user; + if(params.encrypt) { + useCrypto = true; + rsa.generate(1024, "10001"); + var data = { + rsa_n: rsa.n.toString(16), + rsa_e: rsa.e + }; + handshakeBuffer.sys.rsa = data; + } + handshakeCallback = params.handshakeCallback; + connect(params, url, cb); + }; + + var defaultDecode = starx.decode = function(data) { + var msg = Message.decode(data); + + if(msg.id > 0){ + msg.route = routeMap[msg.id]; + delete routeMap[msg.id]; + if(!msg.route){ + return; + } + } + + msg.body = deCompose(msg); + return msg; + }; + + var defaultEncode = starx.encode = function(reqId, route, msg) { + var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY; + + if(decodeIO_encoder && decodeIO_encoder.lookup(route)) { + var Builder = decodeIO_encoder.build(route); + msg = new Builder(msg).encodeNB(); + } else { + msg = Protocol.strencode(JSON.stringify(msg)); + } + + var compressRoute = 0; + if(dict && dict[route]) { + route = dict[route]; + compressRoute = 1; + } + + return Message.encode(reqId, type, compressRoute, route, msg); + }; + + var connect = function(params, url, cb) { + console.log('connect to ' + url); + + var params = params || {}; + var maxReconnectAttempts = params.maxReconnectAttempts || DEFAULT_MAX_RECONNECT_ATTEMPTS; + reconnectUrl = url; + + var onopen = function(event) { + if(!!reconnect) { + starx.emit('reconnect'); + } + reset(); + var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(handshakeBuffer))); + send(obj); + }; + var onmessage = function(event) { + processPackage(Package.decode(event.data), cb); + // new package arrived, update the heartbeat timeout + if(heartbeatTimeout) { + nextHeartbeatTimeout = Date.now() + heartbeatTimeout; + } + }; + var onerror = function(event) { + starx.emit('io-error', event); + console.error('socket error: ', event); + }; + var onclose = function(event) { + starx.emit('close',event); + starx.emit('disconnect', event); + console.log('socket close: ', event); + if(!!params.reconnect && reconnectAttempts < maxReconnectAttempts) { + reconnect = true; + reconnectAttempts++; + reconncetTimer = setTimeout(function() { + connect(params, reconnectUrl, cb); + }, reconnectionDelay); + reconnectionDelay *= 2; + } + }; + socket = new WebSocket(url); + socket.binaryType = 'arraybuffer'; + socket.onopen = onopen; + socket.onmessage = onmessage; + socket.onerror = onerror; + socket.onclose = onclose; + }; + + starx.disconnect = function() { + if(socket) { + if(socket.disconnect) socket.disconnect(); + if(socket.close) socket.close(); + console.log('disconnect'); + socket = null; + } + + if(heartbeatId) { + clearTimeout(heartbeatId); + heartbeatId = null; + } + if(heartbeatTimeoutId) { + clearTimeout(heartbeatTimeoutId); + heartbeatTimeoutId = null; + } + }; + + var reset = function() { + reconnect = false; + reconnectionDelay = 1000 * 5; + reconnectAttempts = 0; + clearTimeout(reconncetTimer); + }; + + starx.request = function(route, msg, cb) { + if(arguments.length === 2 && typeof msg === 'function') { + cb = msg; + msg = {}; + } else { + msg = msg || {}; + } + route = route || msg.route; + if(!route) { + return; + } + + reqId++; + sendMessage(reqId, route, msg); + + callbacks[reqId] = cb; + routeMap[reqId] = route; + }; + + starx.notify = function(route, msg) { + msg = msg || {}; + sendMessage(0, route, msg); + }; + + var sendMessage = function(reqId, route, msg) { + if(useCrypto) { + msg = JSON.stringify(msg); + var sig = rsa.signString(msg, "sha256"); + msg = JSON.parse(msg); + msg['__crypto__'] = sig; + } + + if(encode) { + msg = encode(reqId, route, msg); + } + + var packet = Package.encode(Package.TYPE_DATA, msg); + send(packet); + }; + + var send = function(packet) { + socket.send(packet.buffer); + }; + + var handler = {}; + + var heartbeat = function(data) { + if(!heartbeatInterval) { + // no heartbeat + return; + } + + var obj = Package.encode(Package.TYPE_HEARTBEAT); + if(heartbeatTimeoutId) { + clearTimeout(heartbeatTimeoutId); + heartbeatTimeoutId = null; + } + + if(heartbeatId) { + // already in a heartbeat interval + return; + } + heartbeatId = setTimeout(function() { + heartbeatId = null; + send(obj); + + nextHeartbeatTimeout = Date.now() + heartbeatTimeout; + heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, heartbeatTimeout); + }, heartbeatInterval); + }; + + var heartbeatTimeoutCb = function() { + var gap = nextHeartbeatTimeout - Date.now(); + if(gap > gapThreshold) { + heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, gap); + } else { + console.error('server heartbeat timeout'); + starx.emit('heartbeat timeout'); + starx.disconnect(); + } + }; + + var handshake = function(data) { + data = JSON.parse(Protocol.strdecode(data)); + if(data.code === RES_OLD_CLIENT) { + starx.emit('error', 'client version not fullfill'); + return; + } + + if(data.code !== RES_OK) { + starx.emit('error', 'handshake fail'); + return; + } + + handshakeInit(data); + + var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK); + send(obj); + if(initCallback) { + initCallback(socket); + } + }; + + var onData = function(data) { + var msg = data; + if(decode) { + msg = decode(msg); + } + processMessage(starx, msg); + }; + + var onKick = function(data) { + data = JSON.parse(Protocol.strdecode(data)); + starx.emit('onKick', data); + }; + + handlers[Package.TYPE_HANDSHAKE] = handshake; + handlers[Package.TYPE_HEARTBEAT] = heartbeat; + handlers[Package.TYPE_DATA] = onData; + handlers[Package.TYPE_KICK] = onKick; + + var processPackage = function(msgs) { + if(Array.isArray(msgs)) { + for(var i=0; i