diff --git a/game_room/luaclib-src/lua-websocketnetpack.c b/game_room/luaclib-src/lua-websocketnetpack.c index 346f9bd..4063437 100644 --- a/game_room/luaclib-src/lua-websocketnetpack.c +++ b/game_room/luaclib-src/lua-websocketnetpack.c @@ -1,6 +1,6 @@ #include "skynet_malloc.h" #include "skynet_socket.h" - +#include "skynet.h" #include #include @@ -223,13 +223,12 @@ static struct uncomplete * save_uncomplete(lua_State *L, int fd) { } static uint64_t ntoh64(uint64_t host) { - uint64_t ret = 0; - uint32_t high, low; - low = host & 0xFFFFFFFF; - high = (host >> 32) & 0xFFFFFFFF; - low = ntohl(low); + uint32_t low = host & 0xFFFFFFFF; + uint32_t high = (host >> 32) & 0xFFFFFFFF; + // ntohl()是将一个无符号长整形数从网络字节顺序转换为主机字节顺序, ntohl()返回一个以主机字节顺序表达的数。 + low = ntohl(low); high = ntohl(high); - ret = low; + uint64_t ret = low; ret <<= 32; ret |= high; return ret; @@ -255,41 +254,46 @@ read_size(uint8_t * buffer, int size, int* pack_head_length, int* mask, int * is //printf("read_size2 fin=%d rsv1=%d rsv2=%d rsv3=%d opcode=%d is_mask=%d\n", fin, rsv1, rsv2, rsv3, opcode, is_mask); if (0x0 != rsv1 || 0x0 != rsv2 || 0x0 != rsv3) - { + {// 如果非 0,客户端、服务端协商采用WebSocket扩展,且值的含义由扩展进行定义。 return -2; } if (fin == 0 || opcode == 0) { + // 当 fin 如果是1,表示这是消息的最后一个分片(fragment),如果是0,表示不是是消息的最后一个分片(fragment)。 + // 当Opcode为0时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片。 return -2; } if (opcode == 8) { + // 表示连接断开。 return -3; } int offset = 0; int pack_size = 0; - //0-125 - char length = buffer[1] & 0x7f; + + // 数据载荷长度,单位是字节。7 bits, 7+16 bits, or 7+64 bits + char payload_length = buffer[1] & 0x7f; offset += WEBSOCKET_HEADER_LEN; - //126 - if (length < 0x7E) + //0-125 -> 7位 -> 1字节 + if (payload_length < 0x7E) { - pack_size = length; + pack_size = payload_length; } - //Short - else if (0x7E == length) + //126 Short -> 7+16位 3字节 + else if (0x7E == payload_length) { if (size < WEBSOCKET_HEADER_LEN + sizeof(short)) { return -1; } - pack_size = ntohs(*((uint16_t *) (buffer+WEBSOCKET_HEADER_LEN))); + // 如果payload length占用了多个字节的话,payload length的二进制表达采用网络序(big endian,重要的位在前)。 + pack_size = ntohs(*((uint32_t *) (buffer+WEBSOCKET_HEADER_LEN))); //printf("read_size3 pack_size=%d sizeof(short)=%d sizeof(uint16_t)=%d\n", pack_size, sizeof(short), sizeof(uint16_t)); offset += sizeof(short); } else - { + {//127 -> 7+64位 if (size < WEBSOCKET_HEADER_LEN + sizeof(int64_t)) { return -1; @@ -310,6 +314,7 @@ read_size(uint8_t * buffer, int size, int* pack_head_length, int* mask, int * is char *masks = (char*)mask; memcpy(masks, (buffer + offset), WEBSOCKET_MASK_LEN); offset += WEBSOCKET_MASK_LEN; + //*hasunmask_size = WEBSOCKET_MASK_LEN } *pack_head_length = offset; @@ -386,21 +391,22 @@ push_more(lua_State *L, int fd, uint8_t *buffer, int size, int wsocket_handeshak if (wsocket_handeshake) { //认为socket初次建立连接读取握手协议 - pack_size = get_http_header(buffer, size); + pack_size = get_http_header(buffer, size); //printf("push_more wsocket_handeshake=%d buffersize=%d pack_size=%d\n", wsocket_handeshake, size, pack_size); } else { //读取帧大小 - while ((pack_size = read_size(buffer, size, &pack_head_length, &mask, &ismask, &hasunmask_size)) <= -2) + while ((pack_size = read_size(buffer, size, &pack_head_length, &mask, &ismask, &hasunmask_size)) == -2) { - mask = 0; + mask = 0; ismask = 0; hasunmask_size = 0; buffer += WEBSOCKET_HEADER_LEN; size -= WEBSOCKET_HEADER_LEN; } - if (pack_size > MAX_PACKSIZE) { + if (pack_size > MAX_PACKSIZE) + { pack_size = MAX_PACKSIZE; } //printf("push_more not wsocket_handeshake buffersize=%d pack_size=%d pack_head_length=%d mask=%d ismask=%d hasunmask_size=%d\n" @@ -412,11 +418,13 @@ push_more(lua_State *L, int fd, uint8_t *buffer, int size, int wsocket_handeshak struct uncomplete * uc = save_uncomplete(L, fd); uc->read = -1; - if (wsocket_handeshake && size > HEADERSIZE) { + if (wsocket_handeshake && size > HEADERSIZE) + { uc->header_size = HEADERSIZE; memcpy(uc->header, buffer, HEADERSIZE); } - else { + else + { uc->header_size += size; memcpy(uc->header, buffer, size); } @@ -478,14 +486,14 @@ filter_data_(lua_State *L, int fd, uint8_t * buffer, int size, int wsocket_hande total_size += size; //print_bin("fileter", buffer, size); - //printf("totalsize=%d filter_data size=%d wsocket_handeshake=%d\n", total_size, size, wsocket_handeshake); + skynet_error(NULL, "totalsize:%d filter_data - size:%d,wsocket_handeshake:%d", total_size, size, wsocket_handeshake); if (uc) { // fill uncomplete if (uc->read < 0) { - //printf("uc->read < 0: buffersize=%d wsocket_handeshake=%d uc.header_size=%d\n", size, wsocket_handeshake, uc->header_size); + skynet_error(NULL, "uc->read < 0: buffersize:%d,wsocket_handeshake:%d,uc.header_size:%d", size, wsocket_handeshake, uc->header_size); // read size int index = 0; while (size > 0) @@ -537,7 +545,7 @@ filter_data_(lua_State *L, int fd, uint8_t * buffer, int size, int wsocket_hande int h = hash_fd(fd); uc->next = q->hash[h]; q->hash[h] = uc; - return 1; + return 1; } else if (pack_size == -3) { close_uncomplete(L, fd); lua_pushvalue(L, lua_upvalueindex(TYPE_WSCLOSE)); @@ -827,6 +835,7 @@ lpack(lua_State *L) { } else { + // 7 bits, 7+16 bits, or 7+64 bits if (len < 65536) { frame_header[pos++] = FRAME_SET_MASK(0) | 126; diff --git a/game_room/lualib/utility/gateserver_ws.lua b/game_room/lualib/utility/gateserver_ws.lua index eee4f5f..791307e 100644 --- a/game_room/lualib/utility/gateserver_ws.lua +++ b/game_room/lualib/utility/gateserver_ws.lua @@ -109,15 +109,12 @@ end function gateserver.openclient(fd) if connection[fd] ~= nil and connection[fd].isconnect then socketdriver.start(fd) - return true end - return false end function gateserver.closeclient(fd) local c = connection[fd] if c then - client_number = client_number - 1 connection[fd] = nil socketdriver.close(fd) end @@ -189,6 +186,7 @@ function gateserver.start(handler) else skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz))) end + return end MSG.data = dispatch_msg diff --git a/game_room/service/wsGateway.lua b/game_room/service/wsGateway.lua index 5967cdb..51573b9 100644 --- a/game_room/service/wsGateway.lua +++ b/game_room/service/wsGateway.lua @@ -16,9 +16,9 @@ local function unforward(c) skynet.error(string.format("%s unforward agent",SERVICE_NAME), c.agent) if c.agent then skynet.error(string.format("%s unforward close fd=%d",SERVICE_NAME, c.fd)) - print("aaaaaaaaaaaaaaaaaaaaa") + skynet.error("aaaaaaaaaaaaaaaaaaaaa") skynet.send(c.agent, "lua", "exit") - print("bbbbbbbbbbbbbbbbbbbbb") + skynet.error("bbbbbbbbbbbbbbbbbbbbb") _forwarding[c.agent] = nil c.agent = nil end @@ -27,14 +27,14 @@ end local function close_fd(fd) local c = _connection[fd] skynet.error(string.format("%s, close_fd - fd:%d", SERVICE_NAME,fd), c) - print("ffffffffffffffffffffffffff") + skynet.error("ffffffffffffffffffffffffff") if c then - print("cccccccccccccccccccccc") + skynet.error("cccccccccccccccccccccc") unforward(c) - print("ddddddddddddddddddddddddd") + skynet.error("ddddddddddddddddddddddddd") _connection[fd] = nil end - print("eeeeeeeeeeeeeeeeeeeee") + skynet.error("eeeeeeeeeeeeeeeeeeeee") end local function onTimercheckTimeout() @@ -71,7 +71,7 @@ function CMD.accept(source, fd) end function CMD.kick(source, fd) - skynet.error(string.format("%s, kick -fd:%d", SERVICE_NAME,fd)) + skynet.error(string.format("%s, kick - fd:%d", SERVICE_NAME,fd)) gateserver.closeclient(fd) end @@ -109,16 +109,16 @@ function handler.message(fd, msg, sz) local c = _connection[fd] skynet.error(string.format("%s, message - fd:%d", SERVICE_NAME,fd),c) if c then - print("gggggggggggggg,",c.agent) + skynet.error("gggggggggggggg,",c.agent) end - print("hhhhhhhhhhhhhhhhhhhhhhhh") + skynet.error("hhhhhhhhhhhhhhhhhhhhhhhh") if c and c.agent then c.tick = skynet.now() skynet.redirect(c.agent, 0, "wireWebSocketStr", 0, msg, sz) else - print("iiiiiiiiiiiiiiiiiiiiiiiii") + skynet.error("iiiiiiiiiiiiiiiiiiiiiiiii") close_fd(fd) - print("jjjjjjjjjjjjjjjjjjjjjjj") + skynet.error("jjjjjjjjjjjjjjjjjjjjjjj") end end @@ -145,9 +145,9 @@ end function handler.disconnect(fd) skynet.error(string.format("%s, disconnect - fd:%d", SERVICE_NAME,fd)) - print("kkkkkkkkkkkkkkkkk") + skynet.error("kkkkkkkkkkkkkkkkk") close_fd(fd) - print("llllllllllllllllllllll") + skynet.error("llllllllllllllllllllll") end function handler.wsdisconnect(fd) diff --git a/skynet/3rd/lua/lauxlib.c b/skynet/3rd/lua/lauxlib.c index 63387b7..b77062b 100644 --- a/skynet/3rd/lua/lauxlib.c +++ b/skynet/3rd/lua/lauxlib.c @@ -699,6 +699,7 @@ static int skipcomment (LoadF *lf, int *cp) { else return 0; /* no comment */ } +LUA_API void luaS_expandshr(int n); static int luaL_loadfilex_ (lua_State *L, const char *filename, const char *mode) { @@ -724,7 +725,9 @@ static int luaL_loadfilex_ (lua_State *L, const char *filename, } if (c != EOF) lf.buff[lf.n++] = c; /* 'c' is the first character of the stream */ + luaS_expandshr(4096); status = lua_load(L, getF, &lf, lua_tostring(L, -1), mode); + luaS_expandshr(-4096); readstatus = ferror(lf.f); if (filename) fclose(lf.f); /* close file (even in case of errors) */ if (readstatus) { @@ -1091,21 +1094,20 @@ save(const char *key, const void * proto) { SPIN_LOCK(&CC) if (CC.L == NULL) { init(); - L = CC.L; + } + L = CC.L; + lua_pushstring(L, key); + lua_pushvalue(L, -1); + lua_rawget(L, LUA_REGISTRYINDEX); + result = lua_touserdata(L, -1); /* stack: key oldvalue */ + if (result == NULL) { + lua_pop(L,1); + lua_pushlightuserdata(L, (void *)proto); + lua_rawset(L, LUA_REGISTRYINDEX); } else { - L = CC.L; - lua_pushstring(L, key); - lua_pushvalue(L, -1); - lua_rawget(L, LUA_REGISTRYINDEX); - result = lua_touserdata(L, -1); /* stack: key oldvalue */ - if (result == NULL) { - lua_pop(L,1); - lua_pushlightuserdata(L, (void *)proto); - lua_rawset(L, LUA_REGISTRYINDEX); - } else { - lua_pop(L,2); - } + lua_pop(L,2); } + SPIN_UNLOCK(&CC) return result; } @@ -1152,8 +1154,6 @@ static int cache_mode(lua_State *L) { return 0; } -LUA_API void luaS_expandshr(int n); - LUALIB_API int luaL_loadfilex (lua_State *L, const char *filename, const char *mode) { int level = cache_level(L); @@ -1173,7 +1173,6 @@ LUALIB_API int luaL_loadfilex (lua_State *L, const char *filename, lua_pushliteral(L, "New state failed"); return LUA_ERRMEM; } - luaS_expandshr(4096); int err = luaL_loadfilex_(eL, filename, mode); if (err != LUA_OK) { size_t sz = 0; diff --git a/skynet/3rd/lua/lstring.c b/skynet/3rd/lua/lstring.c index 6aea450..095771e 100644 --- a/skynet/3rd/lua/lstring.c +++ b/skynet/3rd/lua/lstring.c @@ -271,9 +271,8 @@ Udata *luaS_newudata (lua_State *L, size_t s) { #include "atomic.h" #include -#define SHRSTR_SLOT 0x10000 -#define HASH_NODE(h) ((h) % SHRSTR_SLOT) #define getaddrstr(ts) (cast(char *, (ts)) + sizeof(UTString)) +#define SHRSTR_INITSIZE 0x10000 struct shrmap_slot { struct rwlock lock; @@ -281,63 +280,188 @@ struct shrmap_slot { }; struct shrmap { - struct shrmap_slot h[SHRSTR_SLOT]; + struct rwlock lock; int n; + int mask; + int total; + int roslots; + struct shrmap_slot * readwrite; + struct shrmap_slot * readonly; }; static struct shrmap SSM; -LUA_API void -luaS_initshr() { - struct shrmap * s = &SSM; +static struct shrmap_slot * +shrstr_newpage(int sz) { int i; - for (i=0;ih[i].lock); + struct shrmap_slot * s = (struct shrmap_slot *)malloc(sz * sizeof(*s)); + if (s == NULL) + return NULL; + for (i=0;iu.hnext; + free(str); + str = next; + } + } + free(s); + } +} + +static int +shrstr_allocpage(struct shrmap * s, int osz, int sz, struct shrmap_slot * newpage) { + if (s->readonly != NULL) + return 0; + if ((s->mask + 1) != osz) + return 0; + s->readonly = s->readwrite; + s->readwrite = newpage; + s->roslots = s->mask + 1; + s->mask = sz - 1; + + return 1; +} + +static void +shrstr_rehash(struct shrmap *s, int slotid) { + struct shrmap_slot *slot = &s->readonly[slotid]; + rwlock_wlock(&slot->lock); + TString *str = slot->str; while (str) { TString * next = str->u.hnext; - free(str); + int newslotid = str->hash & s->mask; + struct shrmap_slot *newslot = &s->readwrite[newslotid]; + rwlock_wlock(&newslot->lock); + str->u.hnext = newslot->str; + newslot->str = str; + rwlock_wunlock(&newslot->lock); str = next; } + + slot->str = NULL; + rwlock_wunlock(&slot->lock); +} + +/* + 1. writelock SSM if readonly == NULL, (Only one thread can expand) + 2. move old page (readwrite) to readonly + 3. new (empty) page with double size to readwrite + 4. unlock SSM + 5. rehash every slots + 6. remove temporary readonly (writelock SSM) + */ +static void +shrstr_expandpage(int cap) { + struct shrmap * s = &SSM; + if (s->readonly) + return; + int osz = s->mask + 1; + int sz = osz * 2; + while (sz < cap) { + // overflow check + if (sz <= 0) + return; + sz = sz * 2; } + struct shrmap_slot * newpage = shrstr_newpage(sz); + if (newpage == NULL) + return; + rwlock_wlock(&s->lock); + int succ = shrstr_allocpage(s, osz, sz, newpage); + rwlock_wunlock(&s->lock); + if (!succ) { + shrstr_deletepage(newpage, sz); + return; + } + int i; + for (i=0;ilock); + struct shrmap_slot * oldpage = s->readonly; + s->readonly = NULL; + rwlock_wunlock(&s->lock); + shrstr_deletepage(oldpage, osz); +} + +LUA_API void +luaS_initshr() { + struct shrmap * s = &SSM; + rwlock_init(&s->lock); + s->n = 0; + s->mask = SHRSTR_INITSIZE - 1; + s->readwrite = shrstr_newpage(SHRSTR_INITSIZE); + s->readonly = NULL; + LNGSTR_SEED = make_lstr_seed(); +} + +LUA_API void +luaS_exitshr() { + struct shrmap * s = &SSM; + rwlock_wlock(&s->lock); + int sz = s->mask + 1; + shrstr_deletepage(s->readwrite, sz); + shrstr_deletepage(s->readonly, s->roslots); + s->readwrite = NULL; + s->readonly = NULL; } static TString * -query_string(unsigned int h, const char *str, lu_byte l) { - struct shrmap_slot *s = &SSM.h[HASH_NODE(h)]; - rwlock_rlock(&s->lock); - TString *ts = s->str; - while (ts) { - if (ts->hash == h && - ts->shrlen == l && - memcmp(str, ts+1, l) == 0) { - break; +find_string(TString *t, struct shrmap_slot * slot, unsigned int h, const char *str, lu_byte l) { + TString *ts = slot->str; + if (t) { + while (ts) { + if (ts == t) + break; + ts = ts->u.hnext; + } + } else { + while (ts) { + if (ts->hash == h && + ts->shrlen == l && + memcmp(str, ts+1, l) == 0) { + break; + } + ts = ts->u.hnext; } - ts = ts->u.hnext; } - rwlock_runlock(&s->lock); return ts; } +/* + 1. readlock SSM + 2. find string in readwrite page + 3. find string in readonly (if exist, during exapnding) + 4. unlock SSM + */ static TString * -query_ptr(TString *t) { - unsigned int h = t->hash; - struct shrmap_slot *s = &SSM.h[HASH_NODE(h)]; +query_string(TString *t, unsigned int h, const char *str, lu_byte l) { + struct shrmap * s = &SSM; + TString *ts = NULL; rwlock_rlock(&s->lock); - TString *ts = s->str; - while (ts) { - if (ts == t) - break; - ts = ts->u.hnext; - } + struct shrmap_slot *slot = &s->readwrite[h & s->mask]; + rwlock_rlock(&slot->lock); + ts = find_string(t, slot, h, str, l); + rwlock_runlock(&slot->lock); + if (ts == NULL && s->readonly != NULL) { + int mask = s->roslots - 1; + slot = &s->readonly[h & mask]; + rwlock_rlock(&slot->lock); + ts = find_string(t, slot, h, str, l); + rwlock_runlock(&slot->lock); + } rwlock_runlock(&s->lock); return ts; } @@ -354,31 +478,47 @@ new_string(unsigned int h, const char *str, lu_byte l) { return ts; } +static TString * +shrstr_exist(struct shrmap * s, unsigned int h, const char *str, lu_byte l) { + TString *found; + if (s->readonly) { + unsigned int mask = s->roslots - 1; + struct shrmap_slot *slot = &s->readonly[h & mask]; + rwlock_rlock(&slot->lock); + found = find_string(NULL, slot, h, str, l); + rwlock_runlock(&slot->lock); + if (found) + return found; + } + struct shrmap_slot *slot = &s->readwrite[h & s->mask]; + rwlock_wlock(&slot->lock); + found = find_string(NULL, slot, h, str, l); + if (found) { + rwlock_wunlock(&slot->lock); + return found; + } + // not found, lock slot and return. + return NULL; +} + static TString * add_string(unsigned int h, const char *str, lu_byte l) { + struct shrmap * s = &SSM; TString * tmp = new_string(h, str, l); - struct shrmap_slot *s = &SSM.h[HASH_NODE(h)]; - rwlock_wlock(&s->lock); - TString *ts = s->str; - while (ts) { - if (ts->hash == h && - ts->shrlen == l && - memcmp(str, ts+1, l) == 0) { - break; + rwlock_rlock(&s->lock); + struct TString *ts = shrstr_exist(s, h, str, l); + if (ts) { + // string is create by other thread, so free tmp + free(tmp); + } else { + struct shrmap_slot *slot = &s->readwrite[h & s->mask]; + ts = tmp; + ts->u.hnext = slot->str; + slot->str = ts; + rwlock_wunlock(&slot->lock); + ATOM_INC(&SSM.total); } - ts = ts->u.hnext; - } - if (ts == NULL) { - ts = tmp; - ts->u.hnext = s->str; - s->str = ts; - tmp = NULL; - } - rwlock_wunlock(&s->lock); - if (tmp) { - // string is create by other thread, so free tmp - free(tmp); - } + rwlock_runlock(&s->lock); return ts; } @@ -394,7 +534,7 @@ internshrstr (lua_State *L, const char *str, size_t l) { return ts; // lookup SSM again h0 = luaS_hash(str, l, 0); - ts = query_string(h0, str, l); + ts = query_string(NULL, h0, str, l); if (ts) return ts; // If SSM.n greate than 0, add it to SSM @@ -408,8 +548,19 @@ internshrstr (lua_State *L, const char *str, size_t l) { LUA_API void luaS_expandshr(int n) { - if (SSM.n < n) - ATOM_ADD(&SSM.n, n); + struct shrmap * s = &SSM; + if (n < 0) { + if (-n > s->n) { + n = -s->n; + } + } + ATOM_ADD(&s->n, n); + if (n > 0) { + int t = (s->total + s->n) * 5 / 4; + if (t > s->mask) { + shrstr_expandpage(t); + } + } } LUAI_FUNC TString * @@ -428,11 +579,11 @@ luaS_clonestring(lua_State *L, TString *ts) { if (result) return result; // look up SSM by ptr - result = query_ptr(ts); + result = query_string(ts, ts->hash, NULL, 0); if (result) return result; h = luaS_hash(str, l, 0); - result = query_string(h, str, l); + result = query_string(NULL, h, str, l); if (result) return result; // ts is not in SSM, so recalc hash, and add it to SSM @@ -457,25 +608,70 @@ getslot(struct shrmap_slot *s, struct slotinfo *info) { rwlock_runlock(&s->lock); } + +struct variance { + int count; + double mean; + double m2; +}; + +static void +variance_update(struct variance *v, int newValue_) { + double newValue = (double)newValue_; + ++v->count; + double delta = newValue - v->mean; + v->mean += delta / v->count; + double delta2 = newValue - v->mean; + v->m2 += delta * delta2; +} + LUA_API int luaS_shrinfo(lua_State *L) { struct slotinfo total; struct slotinfo tmp; memset(&total, 0, sizeof(total)); - int i; - int len = 0; - for (i=0;i total.len) { - total.len = tmp.len; + struct shrmap * s = &SSM; + struct variance v = { 0,0,0 }; + int slots = 0; + rwlock_rlock(&s->lock); + int i; + int sz = s->mask + 1; + for (i=0;ireadwrite[i]; + getslot(slot, &tmp); + if (tmp.len > 0) { + if (tmp.len > total.len) { + total.len = tmp.len; + } + total.size += tmp.size; + variance_update(&v, tmp.len); + ++slots; + } } - total.size += tmp.size; + if (s->readonly) { + sz = s->roslots; + for (i=0;ireadonly[i]; + getslot(slot, &tmp); + if (tmp.len > 0) { + if (tmp.len > total.len) { + total.len = tmp.len; + } + total.size += tmp.size; + variance_update(&v, tmp.len); + } + } + } + rwlock_runlock(&s->lock); + lua_pushinteger(L, SSM.total); // count + lua_pushinteger(L, total.size); // total size + lua_pushinteger(L, total.len); // longest + lua_pushinteger(L, SSM.n); // space + lua_pushinteger(L, slots); // slots + if (v.count > 1) { + lua_pushnumber(L, v.m2 / v.count); // variance + } else { + lua_pushnumber(L, 0); // variance } - lua_pushinteger(L, len); - lua_pushinteger(L, total.size); - lua_pushinteger(L, total.len); - lua_pushinteger(L, SSM.n); - return 4; + return 6; } diff --git a/skynet/Makefile b/skynet/Makefile index 127d755..004aabb 100644 --- a/skynet/Makefile +++ b/skynet/Makefile @@ -41,7 +41,7 @@ $(JEMALLOC_STATICLIB) : 3rd/jemalloc/Makefile git submodule update --init 3rd/jemalloc/Makefile : | 3rd/jemalloc/autogen.sh - cd 3rd/jemalloc && ./autogen.sh --with-jemalloc-prefix=je_ --disable-valgrind + cd 3rd/jemalloc && ./autogen.sh --with-jemalloc-prefix=je_ --enable-prof jemalloc : $(MALLOC_STATICLIB) diff --git a/skynet/lualib-src/ltls.c b/skynet/lualib-src/ltls.c index 74b3a3b..00716b2 100644 --- a/skynet/lualib-src/ltls.c +++ b/skynet/lualib-src/ltls.c @@ -119,19 +119,20 @@ _bio_read(lua_State* L, struct tls_context* tls_p) { if(pending >0) { luaL_Buffer b; luaL_buffinit(L, &b); - do { + while(pending > 0) { read = BIO_read(tls_p->out_bio, outbuff, sizeof(outbuff)); - // printf("_bio_read:%d pending:%d\n", read, pending); - if(read > sizeof(outbuff)) { - luaL_error(L, "invalid BIO_read:%d", read); - }else if(read == -2) { - luaL_error(L, "BIO_read not implemented in the specific BIO type"); - }else if (read > 0) { + // printf("BIO_read read:%d pending:%d\n", read, pending); + if(read <= 0) { + luaL_error(L, "BIO_read error:%d", read); + }else if(read <= sizeof(outbuff)) { all_read += read; luaL_addlstring(&b, (const char*)outbuff, read); + }else { + luaL_error(L, "invalid BIO_read:%d", read); } - }while(read == sizeof(outbuff)); - if(all_read>0){ + pending = BIO_ctrl_pending(tls_p->out_bio); + } + if(all_read>0) { luaL_pushresult(&b); } } @@ -145,13 +146,14 @@ _bio_write(lua_State* L, struct tls_context* tls_p, const char* s, size_t len) { size_t sz = len; while(sz > 0) { int written = BIO_write(tls_p->in_bio, p, sz); - if(written > sz) { - luaL_error(L, "invalid BIO_write"); - }else if(written > 0) { + // printf("BIO_write written:%d sz:%zu\n", written, sz); + if(written <= 0) { + luaL_error(L, "BIO_write error:%d", written); + }else if (written <= sz) { p += written; sz -= written; - }else if (written == -2) { - luaL_error(L, "BIO_write not implemented in the specific BIO type"); + }else { + luaL_error(L, "invalid BIO_write:%d", written); } } } @@ -178,10 +180,15 @@ _ltls_context_handshake(lua_State* L) { int ret = SSL_do_handshake(tls_p->ssl); if(ret == 1) { return 0; - } else if (ret == -1) { - int all_read = _bio_read(L, tls_p); - if(all_read>0) { - return 1; + } else if (ret < 0) { + int err = SSL_get_error(tls_p->ssl, ret); + if(err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + int all_read = _bio_read(L, tls_p); + if(all_read>0) { + return 1; + } + } else { + luaL_error(L, "SSL_do_handshake error:%d ret:%d", err, ret); } } else { int err = SSL_get_error(tls_p->ssl, ret); @@ -210,18 +217,18 @@ _ltls_context_read(lua_State* L) { do { read = SSL_read(tls_p->ssl, outbuff, sizeof(outbuff)); - if(read < 0) { + if(read <= 0) { int err = SSL_get_error(tls_p->ssl, read); - if(err == SSL_ERROR_WANT_READ) { + if(err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { break; } luaL_error(L, "SSL_read error:%d", err); - }else if(read > sizeof(outbuff)) { - luaL_error(L, "invalid SSL_read"); - }else if (read > 0) { + }else if(read <= sizeof(outbuff)) { luaL_addlstring(&b, outbuff, read); + }else { + luaL_error(L, "invalid SSL_read:%d", read); } - }while(read == sizeof(outbuff)); + }while(true); luaL_pushresult(&b); return 1; } @@ -233,16 +240,16 @@ _ltls_context_write(lua_State* L) { size_t slen = 0; char* unencrypted_data = (char*)lua_tolstring(L, 2, &slen); - while(slen >0) { + while(slen > 0) { int written = SSL_write(tls_p->ssl, unencrypted_data, slen); - if(written < 0) { + if(written <= 0) { int err = SSL_get_error(tls_p->ssl, written); luaL_error(L, "SSL_write error:%d", err); - }else if(written > slen) { - luaL_error(L, "invalid SSL_write"); - }else if(written>0) { + }else if(written <= slen) { unencrypted_data += written; slen -= written; + }else { + luaL_error(L, "invalid SSL_write:%d", written); } } diff --git a/skynet/lualib-src/lua-memory.c b/skynet/lualib-src/lua-memory.c index 5fc7575..426acf1 100644 --- a/skynet/lualib-src/lua-memory.c +++ b/skynet/lualib-src/lua-memory.c @@ -49,6 +49,26 @@ lcurrent(lua_State *L) { return 1; } +static int +ldumpheap(lua_State *L) { + mallctl_cmd("prof.dump"); + return 0; +} + +static int +lprofactive(lua_State *L) { + bool *pval, active; + if (lua_isnone(L, 1)) { + pval = NULL; + } else { + active = lua_toboolean(L, 1) ? true : false; + pval = &active; + } + bool ret = mallctl_bool("prof.active", pval); + lua_pushboolean(L, ret); + return 1; +} + LUAMOD_API int luaopen_skynet_memory(lua_State *L) { luaL_checkversion(L); @@ -62,6 +82,8 @@ luaopen_skynet_memory(lua_State *L) { { "ssinfo", luaS_shrinfo }, { "ssexpand", lexpandshrtbl }, { "current", lcurrent }, + { "dumpheap", ldumpheap }, + { "profactive", lprofactive }, { NULL, NULL }, }; diff --git a/skynet/lualib/http/tlshelper.lua b/skynet/lualib/http/tlshelper.lua index 553acd9..e25b229 100644 --- a/skynet/lualib/http/tlshelper.lua +++ b/skynet/lualib/http/tlshelper.lua @@ -52,7 +52,9 @@ function tlshelper.readfunc(fd, tls_ctx) local ds = readfunc(sz) s = tls_ctx:read(ds) end - return read_buff .. s + s = read_buff .. s + read_buff = "" + return s else while #read_buff < sz do local ds = readfunc() diff --git a/skynet/lualib/skynet.lua b/skynet/lualib/skynet.lua index 73055d4..a49f081 100644 --- a/skynet/lualib/skynet.lua +++ b/skynet/lualib/skynet.lua @@ -6,6 +6,8 @@ local assert = assert local pairs = pairs local pcall = pcall local table = table +local tremove = table.remove +local tinsert = table.insert local profile = require "skynet.profile" @@ -70,7 +72,7 @@ local suspend ----- monitor exit local function dispatch_error_queue() - local session = table.remove(error_queue,1) + local session = tremove(error_queue,1) if session then local co = session_id_coroutine[session] session_id_coroutine[session] = nil @@ -89,13 +91,13 @@ local function _error_dispatch(error_session, error_source) end for session, srv in pairs(watching_session) do if srv == error_source then - table.insert(error_queue, session) + tinsert(error_queue, session) end end else -- capture an error for error_session if watching_session[error_session] then - table.insert(error_queue, error_session) + tinsert(error_queue, error_session) end end end @@ -105,7 +107,7 @@ end local coroutine_pool = setmetatable({}, { __mode = "kv" }) local function co_create(f) - local co = table.remove(coroutine_pool) + local co = tremove(coroutine_pool) if co == nil then co = coroutine_create(function(...) f(...) @@ -148,7 +150,7 @@ local function co_create(f) end local function dispatch_wakeup() - local token = table.remove(wakeup_queue,1) + local token = tremove(wakeup_queue,1) if token then local session = sleep_session[token] if session then @@ -479,7 +481,7 @@ end function skynet.wakeup(token) if sleep_session[token] then - table.insert(wakeup_queue, token) + tinsert(wakeup_queue, token) return true end end @@ -526,7 +528,7 @@ function skynet.fork(func,...) local args = { ... } co = co_create(function() func(table.unpack(args,1,n)) end) end - table.insert(fork_queue, co) + tinsert(fork_queue, co) return co end @@ -597,11 +599,10 @@ end function skynet.dispatch_message(...) local succ, err = pcall(raw_dispatch_message,...) while true do - local key,co = next(fork_queue) + local co = tremove(fork_queue,1) if co == nil then break end - fork_queue[key] = nil local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co)) if not fork_succ then if succ then @@ -689,7 +690,7 @@ function skynet.init(f, name) if init_func == nil then f() else - table.insert(init_func, f) + tinsert(init_func, f) if name then assert(type(name) == "string") assert(init_func[name] == nil) diff --git a/skynet/lualib/skynet/cluster.lua b/skynet/lualib/skynet/cluster.lua index c1633d3..2472b70 100644 --- a/skynet/lualib/skynet/cluster.lua +++ b/skynet/lualib/skynet/cluster.lua @@ -3,23 +3,67 @@ local skynet = require "skynet" local clusterd local cluster = {} local sender = {} +local task_queue = {} -local function get_sender(t, node) - local c = skynet.call(clusterd, "lua", "sender", node) - t[node] = c - return c +local function request_sender(q, node) + local ok, c = pcall(skynet.call, clusterd, "lua", "sender", node) + if not ok then + skynet.error(c) + c = nil + end + -- run tasks in queue + local confirm = coroutine.running() + q.confirm = confirm + q.sender = c + for _, task in ipairs(q) do + if type(task) == "table" then + if c then + skynet.send(c, "lua", "push", task[1], skynet.pack(table.unpack(task,2,task.n))) + end + else + skynet.wakeup(task) + skynet.wait(confirm) + end + end + task_queue[node] = nil + sender[node] = c end -setmetatable(sender, { __index = get_sender } ) +local function get_queue(t, node) + local q = {} + t[node] = q + skynet.fork(request_sender, q, node) + return q +end + +setmetatable(task_queue, { __index = get_queue } ) + +local function get_sender(node) + local s = sender[node] + if not s then + local q = task_queue[node] + local task = coroutine.running() + table.insert(q, task) + skynet.wait(task) + skynet.wakeup(q.confirm) + return q.sender + end + return s +end function cluster.call(node, address, ...) -- skynet.pack(...) will free by cluster.core.packrequest - return skynet.call(sender[node], "lua", "req", address, skynet.pack(...)) + return skynet.call(get_sender(node), "lua", "req", address, skynet.pack(...)) end function cluster.send(node, address, ...) -- push is the same with req, but no response - skynet.send(sender[node], "lua", "push", address, skynet.pack(...)) + local s = sender[node] + if not s then + table.insert(task_queue[node], table.pack(address, ...)) + else + skynet.send(sender[node], "lua", "push", address, skynet.pack(...)) + end end function cluster.open(port) @@ -54,7 +98,7 @@ function cluster.register(name, addr) end function cluster.query(node, name) - return skynet.call(sender[node], "lua", "req", 0, skynet.pack(name)) + return skynet.call(get_sender(node), "lua", "req", 0, skynet.pack(name)) end skynet.init(function() diff --git a/skynet/lualib/skynet/datasheet/builder.lua b/skynet/lualib/skynet/datasheet/builder.lua index 1683a10..7e43c0e 100644 --- a/skynet/lualib/skynet/datasheet/builder.lua +++ b/skynet/lualib/skynet/datasheet/builder.lua @@ -69,6 +69,13 @@ local skynet = require "skynet" local datasheet = {} local handles = {} -- handle:{ ref:count , name:name , collect:resp } local dataset = {} -- name:{ handle:handle, monitor:{monitors queue} } +local customers = {} -- source: { handle:true } + +setmetatable(customers, { __index = function(c, source) + local v = {} + c[source] = v + return v +end } ) local function releasehandle(source, handle) local h = handles[handle] @@ -109,6 +116,7 @@ function datasheet.query(source, name) local handle = t.handle local h = handles[handle] h.ref = h.ref + 1 + customers[source][handle] = true skynet.ret(skynet.pack(handle)) end @@ -117,19 +125,35 @@ function datasheet.monitor(source, handle) local h = assert(handles[handle], "Invalid data handle") local t = dataset[h.name] if t.handle ~= handle then -- already changes + customers[source][t.handle] = true skynet.ret(skynet.pack(t.handle)) else assert(not t.monitor[source]) - t.monitor[source]=skynet.response() + local resp = skynet.response() + t.monitor[source]= function(ok, handle) + if ok then + customers[source][handle] = true + end + resp(ok, handle) + end end end -- from customers, release handle , ref count - 1 function datasheet.release(source, handle) -- send message, don't ret + customers[source][handle] = nil releasehandle(source, handle) end +-- customer closed, clear all handles it queried +function datasheet.close(source) + for handle in pairs(customers[source]) do + releasehandle(source, handle) + end + customers[source] = nil +end + -- from builder, monitor handle release function datasheet.collect(source, handle) local h = assert(handles[handle], "Invalid data handle") diff --git a/skynet/lualib/skynet/datasheet/init.lua b/skynet/lualib/skynet/datasheet/init.lua index c902b98..69c9c6d 100644 --- a/skynet/lualib/skynet/datasheet/init.lua +++ b/skynet/lualib/skynet/datasheet/init.lua @@ -11,9 +11,7 @@ end) local datasheet = {} local sheets = setmetatable({}, { __gc = function(t) - for _,v in pairs(t) do - skynet.send(datasheet_svr, "lua", "release", v.handle) - end + skynet.send(datasheet_svr, "lua", "close") end, }) diff --git a/skynet/lualib/skynet/debug.lua b/skynet/lualib/skynet/debug.lua index 3b540fd..e202795 100644 --- a/skynet/lualib/skynet/debug.lua +++ b/skynet/lualib/skynet/debug.lua @@ -14,8 +14,8 @@ local function init(skynet, export) dbgcmd = {} function dbgcmd.MEM() - local kb, bytes = collectgarbage "count" - skynet.ret(skynet.pack(kb,bytes)) + local kb = collectgarbage "count" + skynet.ret(skynet.pack(kb)) end function dbgcmd.GC() diff --git a/skynet/lualib/snax/loginserver.lua b/skynet/lualib/snax/loginserver.lua index be88b97..7ea36b2 100644 --- a/skynet/lualib/snax/loginserver.lua +++ b/skynet/lualib/snax/loginserver.lua @@ -24,7 +24,6 @@ Protocol: 10. Server->Client : 200 base64(subid) Error Code: - 400 Bad Request . challenge failed 401 Unauthorized . unauthorized by auth_handler 403 Forbidden . login_handler failed 406 Not Acceptable . already in login (disallow multi login) @@ -70,7 +69,6 @@ local function launch_slave(auth_handler) local hmac = crypt.hmac64(challenge, secret) if hmac ~= crypt.base64decode(response) then - write("auth", fd, "400 Bad Request\n") error "challenge failed" end diff --git a/skynet/service/clusterd.lua b/skynet/service/clusterd.lua index 59e2ae9..f891df2 100644 --- a/skynet/service/clusterd.lua +++ b/skynet/service/clusterd.lua @@ -1,23 +1,14 @@ local skynet = require "skynet" require "skynet.manager" -local sc = require "skynet.socketchannel" -local socket = require "skynet.socket" local cluster = require "skynet.cluster.core" local config_name = skynet.getenv "cluster" local node_address = {} -local node_session = {} local node_sender = {} local command = {} local config = {} local nodename = cluster.nodename() -local function read_response(sock) - local sz = socket.header(sock:read(2)) - local msg = sock:read(sz) - return cluster.unpackresponse(msg) -- session, ok, data, padding -end - local connecting = {} local function open_channel(t, key) @@ -44,7 +35,7 @@ local function open_channel(t, key) local host, port = string.match(address, "([^:]+):(.*)$") c = node_sender[key] if c == nil then - c = skynet.newservice "clustersender" + c = skynet.newservice("clustersender", key, nodename) if node_sender[key] then -- double check skynet.kill(c) @@ -143,10 +134,18 @@ function command.proxy(source, node, name) end end local fullname = node .. "." .. name - if proxy[fullname] == nil then - proxy[fullname] = skynet.newservice("clusterproxy", node, name) + local p = proxy[fullname] + if p == nil then + p = skynet.newservice("clusterproxy", node, name) + -- double check + if proxy[fullname] then + skynet.kill(p) + p = proxy[fullname] + else + proxy[fullname] = p + end end - skynet.ret(skynet.pack(proxy[fullname])) + skynet.ret(skynet.pack(p)) end local cluster_agent = {} -- fd:service diff --git a/skynet/service/clustersender.lua b/skynet/service/clustersender.lua index 250d70b..992373e 100644 --- a/skynet/service/clustersender.lua +++ b/skynet/service/clustersender.lua @@ -2,21 +2,21 @@ local skynet = require "skynet" local sc = require "skynet.socketchannel" local socket = require "skynet.socket" local cluster = require "skynet.cluster.core" -local ignoreret = skynet.ignoreret local channel local session = 1 +local node, nodename = ... local command = {} local waiting = {} + local function send_request(addr, msg, sz) -- msg is a local pointer, cluster.packrequest will free it local current_session = session local request, new_session, padding = cluster.packrequest(addr, session, msg, sz) session = new_session - -- node_channel[node] may yield or throw error local tracetag = skynet.tracetag() if tracetag then if tracetag:sub(1,1) ~= "(" then @@ -63,7 +63,6 @@ function command.push(addr, msg, sz) session = new_session end - -- node_channel[node] may yield or throw error channel:request(request, nil, padding) end @@ -80,7 +79,7 @@ function command.changenode(host, port) response = read_response, nodelay = true, } - succ, err = pcall(c.connect, c, true) + local succ, err = pcall(c.connect, c, true) if channel then channel:close() end @@ -103,6 +102,3 @@ skynet.start(function() f(...) end) end) - - - diff --git a/skynet/service/debug_console.lua b/skynet/service/debug_console.lua index dbde18e..0323c21 100644 --- a/skynet/service/debug_console.lua +++ b/skynet/service/debug_console.lua @@ -160,6 +160,8 @@ function COMMAND.help() call = "call address ...", trace = "trace address [proto] [on|off]", netstat = "netstat : show netstat", + profactive = "profactive [on|off] : active/deactive jemalloc heap profilling", + dumpheap = "dumpheap : dump heap profilling", } end @@ -336,8 +338,8 @@ function COMMAND.cmem() end function COMMAND.shrtbl() - local n, total, longest, space = memory.ssinfo() - return { n = n, total = total, longest = longest, space = space } + local n, total, longest, space, slots, variance = memory.ssinfo() + return { n = n, total = total, longest = longest, space = space, slots = slots, average = n / slots, variace = variance } end function COMMAND.ping(address) @@ -421,4 +423,19 @@ function COMMAND.netstat() convert_stat(info) end return stat -end \ No newline at end of file +end + +function COMMAND.dumpheap() + memory.dumpheap() +end + +function COMMAND.profactive(flag) + if flag ~= nil then + if flag == "on" or flag == "off" then + flag = toboolean(flag) + end + memory.profactive(flag) + end + local active = memory.profactive() + return "heap profilling is ".. (active and "active" or "deactive") +end diff --git a/skynet/service/launcher.lua b/skynet/service/launcher.lua index d458d13..77300a2 100644 --- a/skynet/service/launcher.lua +++ b/skynet/service/launcher.lua @@ -45,7 +45,7 @@ end function command.MEM() local list = {} for k,v in pairs(services) do - local ok, kb, bytes = pcall(skynet.call,k,"debug","MEM") + local ok, kb = pcall(skynet.call,k,"debug","MEM") if not ok then list[skynet.address(k)] = string.format("ERROR (%s)",v) else diff --git a/skynet/service/service_provider.lua b/skynet/service/service_provider.lua index b04067c..1d0e238 100644 --- a/skynet/service/service_provider.lua +++ b/skynet/service/service_provider.lua @@ -41,6 +41,9 @@ end function provider.launch(name, code, ...) local s = svr[name] + if s.address then + return skynet.ret(skynet.pack(s.address)) + end if s.booting then table.insert(s.queue, skynet.response()) else diff --git a/skynet/skynet-src/malloc_hook.c b/skynet/skynet-src/malloc_hook.c index 02b6c09..150c2cc 100644 --- a/skynet/skynet-src/malloc_hook.c +++ b/skynet/skynet-src/malloc_hook.c @@ -65,10 +65,10 @@ get_allocated_field(uint32_t handle) { return &data->allocated; } -inline static void +inline static void update_xmalloc_stat_alloc(uint32_t handle, size_t __n) { ATOM_ADD(&_used_memory, __n); - ATOM_INC(&_memory_block); + ATOM_INC(&_memory_block); ssize_t* allocated = get_allocated_field(handle); if(allocated) { ATOM_ADD(allocated, __n); @@ -126,12 +126,29 @@ static void malloc_oom(size_t size) { abort(); } -void +void memory_info_dump(void) { je_malloc_stats_print(0,0,0); } -size_t +bool +mallctl_bool(const char* name, bool* newval) { + bool v = 0; + size_t len = sizeof(v); + if(newval) { + je_mallctl(name, &v, &len, newval, sizeof(bool)); + } else { + je_mallctl(name, &v, &len, NULL, 0); + } + return v; +} + +int +mallctl_cmd(const char* name) { + return je_mallctl(name, NULL, NULL, NULL, 0); +} + +size_t mallctl_int64(const char* name, size_t* newval) { size_t v = 0; size_t len = sizeof(v); @@ -144,7 +161,7 @@ mallctl_int64(const char* name, size_t* newval) { return v; } -int +int mallctl_opt(const char* name, int* newval) { int v = 0; size_t len = sizeof(v); @@ -223,23 +240,35 @@ skynet_posix_memalign(void **memptr, size_t alignment, size_t size) { #define raw_realloc realloc #define raw_free free -void +void memory_info_dump(void) { skynet_error(NULL, "No jemalloc"); } -size_t +size_t mallctl_int64(const char* name, size_t* newval) { skynet_error(NULL, "No jemalloc : mallctl_int64 %s.", name); return 0; } -int +int mallctl_opt(const char* name, int* newval) { skynet_error(NULL, "No jemalloc : mallctl_opt %s.", name); return 0; } +bool +mallctl_bool(const char* name, bool* newval) { + skynet_error(NULL, "No jemalloc : mallctl_bool %s.", name); + return 0; +} + +int +mallctl_cmd(const char* name) { + skynet_error(NULL, "No jemalloc : mallctl_cmd %s.", name); + return 0; +} + #endif size_t @@ -275,7 +304,7 @@ skynet_strdup(const char *str) { return ret; } -void * +void * skynet_lalloc(void *ptr, size_t osize, size_t nsize) { if (nsize == 0) { raw_free(ptr); diff --git a/skynet/skynet-src/malloc_hook.h b/skynet/skynet-src/malloc_hook.h index 4da4123..04f522a 100644 --- a/skynet/skynet-src/malloc_hook.h +++ b/skynet/skynet-src/malloc_hook.h @@ -2,6 +2,7 @@ #define SKYNET_MALLOC_HOOK_H #include +#include #include extern size_t malloc_used_memory(void); @@ -9,6 +10,8 @@ extern size_t malloc_memory_block(void); extern void memory_info_dump(void); extern size_t mallctl_int64(const char* name, size_t* newval); extern int mallctl_opt(const char* name, int* newval); +extern bool mallctl_bool(const char* name, bool* newval); +extern int mallctl_cmd(const char* name); extern void dump_c_mem(void); extern int dump_mem_lua(lua_State *L); extern size_t malloc_current_memory(void);