Hook up close, drain, compression, "Neuter" ArrayBuffers on return

This commit is contained in:
Alex Hultman 2019-01-14 10:09:09 +01:00
parent 15684984d9
commit 647b55f5d8
2 changed files with 85 additions and 24 deletions

View File

@ -3,21 +3,26 @@ var uWS = uWS ? uWS : require('../dist/uws.js');
const port = 9001; const port = 9001;
// nice milestone to pass autobahn on both ssl and non-ssl with an without compression const app = uWS./*SSL*/App({
const app = uWS.SSLApp({
key_file_name: '/home/alexhultman/key.pem', key_file_name: '/home/alexhultman/key.pem',
cert_file_name: '/home/alexhultman/cert.pem', cert_file_name: '/home/alexhultman/cert.pem',
passphrase: '1234' passphrase: '1234'
}).get('/hello', (res, req) => { }).get('/hello', (res, req) => {
res.end('Hejdå!'); res.end('Hejdå!');
}).ws('/*', { }).ws('/*', {
compression: 0,
maxPayloadLength: 16 * 1024 * 1024, maxPayloadLength: 16 * 1024 * 1024,
open: (ws, req) => { open: (ws, req) => {
console.log(ws); console.log(ws);
}, },
message: (ws, message, isBinary) => { message: (ws, message, isBinary) => {
ws.send(message, isBinary); ws.send(message, isBinary);
},
drain: (ws) => {
console.log('WebSocket drained');
},
close: (ws, code, message) => {
console.log('WebSocket closed');
} }
}).listen(port, (token) => { }).listen(port, (token) => {
if (token) { if (token) {

View File

@ -158,10 +158,10 @@ Local<Object> getResInstance() {
return Local<Object>::New(isolate, resTemplate[std::is_same<APP, uWS::SSLApp>::value])->Clone(); return Local<Object>::New(isolate, resTemplate[std::is_same<APP, uWS::SSLApp>::value])->Clone();
} }
/* The only thing this req needs is getHeader and similar, getParameter, getUrl and so on */
Persistent<Object> reqTemplate; Persistent<Object> reqTemplate;
void req_getHeader(const FunctionCallbackInfo<Value> &args) { void req_getHeader(const FunctionCallbackInfo<Value> &args) {
// get string
NativeString data(args.GetIsolate(), args[0]); NativeString data(args.GetIsolate(), args[0]);
char *buf = data.getData(); int length = data.getLength(); char *buf = data.getData(); int length = data.getLength();
@ -175,13 +175,21 @@ template <typename APP>
void uWS_App_ws(const FunctionCallbackInfo<Value> &args) { void uWS_App_ws(const FunctionCallbackInfo<Value> &args) {
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0); APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
// pattern
NativeString nativeString(args.GetIsolate(), args[0]); NativeString nativeString(args.GetIsolate(), args[0]);
// todo: small leak here, should be unique_ptrs moved in
Persistent<Function> *openPf = new Persistent<Function>(); Persistent<Function> *openPf = new Persistent<Function>();
Persistent<Function> *messagePf = new Persistent<Function>(); Persistent<Function> *messagePf = new Persistent<Function>();
Persistent<Function> *drainPf = new Persistent<Function>();
Persistent<Function> *closePf = new Persistent<Function>();
int maxPayloadLength = 0; int maxPayloadLength = 0;
/* For now, let's have 0, 1, 2 be from nothing to shared, to dedicated */
int compression = 0;
uWS::CompressOptions mappedCompression = uWS::CompressOptions::DISABLED;
struct PerSocketData { struct PerSocketData {
Persistent<Object> *socketPf; Persistent<Object> *socketPf;
}; };
@ -193,52 +201,97 @@ void uWS_App_ws(const FunctionCallbackInfo<Value> &args) {
/* maxPayloadLength */ /* maxPayloadLength */
maxPayloadLength = behaviorObject->Get(String::NewFromUtf8(isolate, "maxPayloadLength"))->Int32Value(); maxPayloadLength = behaviorObject->Get(String::NewFromUtf8(isolate, "maxPayloadLength"))->Int32Value();
/* Compression */
compression = behaviorObject->Get(String::NewFromUtf8(isolate, "compression"))->Int32Value();
/* Open */ /* Open */
openPf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "open")))); openPf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "open"))));
/* Message */ /* Message */
messagePf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "message")))); messagePf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "message"))));
/* Drain */
drainPf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "drain"))));
/* Close */
closePf->Reset(args.GetIsolate(), Local<Function>::Cast(behaviorObject->Get(String::NewFromUtf8(isolate, "close"))));
}
/* Map compression options from integer values */
if (compression == 1) {
mappedCompression = uWS::CompressOptions::SHARED_COMPRESSOR;
} else if (compression == 2) {
mappedCompression = uWS::CompressOptions::DEDICATED_COMPRESSOR;
} }
app->template ws<PerSocketData>(std::string(nativeString.getData(), nativeString.getLength()), { app->template ws<PerSocketData>(std::string(nativeString.getData(), nativeString.getLength()), {
/* idleTimeout */
/*.compression = uWS::SHARED_COMPRESSOR,*/ .compression = mappedCompression,
.maxPayloadLength = maxPayloadLength, .maxPayloadLength = maxPayloadLength,
/* Handlers */
.open = [openPf](auto *ws, auto *req) { .open = [openPf](auto *ws, auto *req) {
HandleScope hs(isolate); HandleScope hs(isolate);
/* Create a new websocket object */ /* Create a new websocket object */
Local<Object> wsObject = getWsInstance<APP>(); Local<Object> wsObject = getWsInstance<APP>();
wsObject->SetAlignedPointerInInternalField(0, ws); wsObject->SetAlignedPointerInInternalField(0, ws);
/* Attach a new V8 object with pointer to us, to us */ /* Attach a new V8 object with pointer to us, to us */
PerSocketData *perSocketData = (PerSocketData *) ws->getUserData(); PerSocketData *perSocketData = (PerSocketData *) ws->getUserData();
perSocketData->socketPf = new Persistent<Object>; perSocketData->socketPf = new Persistent<Object>;
perSocketData->socketPf->Reset(isolate, wsObject); perSocketData->socketPf->Reset(isolate, wsObject);
Local<Value> argv[] = {wsObject}; Local<Value> argv[] = {wsObject};
Local<Function>::New(isolate, *openPf)->Call(isolate->GetCurrentContext()->Global(), 1, argv); Local<Function>::New(isolate, *openPf)->Call(isolate->GetCurrentContext()->Global(), 1, argv);
}, },
.message = [messagePf](auto *ws, std::string_view message, uWS::OpCode opCode) { .message = [messagePf](auto *ws, std::string_view message, uWS::OpCode opCode) {
HandleScope hs(isolate); HandleScope hs(isolate);
Local<ArrayBuffer> messageArrayBuffer = ArrayBuffer::New(isolate, (void *) message.data(), message.length());
PerSocketData *perSocketData = (PerSocketData *) ws->getUserData(); PerSocketData *perSocketData = (PerSocketData *) ws->getUserData();
Local<Value> argv[3] = {Local<Object>::New(isolate, *(perSocketData->socketPf)), Local<Value> argv[3] = {Local<Object>::New(isolate, *(perSocketData->socketPf)),
ArrayBuffer::New(isolate, (void *) message.data(), message.length()), /*ArrayBuffer::New(isolate, (void *) message.data(), message.length())*/ messageArrayBuffer,
Boolean::New(isolate, opCode == uWS::OpCode::BINARY) Boolean::New(isolate, opCode == uWS::OpCode::BINARY)
}; };
Local<Function>::New(isolate, *messagePf)->Call(isolate->GetCurrentContext()->Global(), 3, argv); Local<Function>::New(isolate, *messagePf)->Call(isolate->GetCurrentContext()->Global(), 3, argv);
}/*
.drain = []() {}, /* Important: we clear the ArrayBuffer to make sure it is not invalidly used after return */
.ping = []() {}, messageArrayBuffer->Neuter();
.pong = []() {}, },
.close = []() {}*/ .drain = [drainPf](auto *ws) {
HandleScope hs(isolate);
PerSocketData *perSocketData = (PerSocketData *) ws->getUserData();
Local<Value> argv[1] = {Local<Object>::New(isolate, *(perSocketData->socketPf))
};
Local<Function>::New(isolate, *drainPf)->Call(isolate->GetCurrentContext()->Global(), 1, argv);
},
.ping = [](auto *ws) {
},
.pong = [](auto *ws) {
},
.close = [closePf](auto *ws, int code, std::string_view message) {
HandleScope hs(isolate);
Local<ArrayBuffer> messageArrayBuffer = ArrayBuffer::New(isolate, (void *) message.data(), message.length());
PerSocketData *perSocketData = (PerSocketData *) ws->getUserData();
Local<Value> argv[3] = {Local<Object>::New(isolate, *(perSocketData->socketPf)),
Integer::New(isolate, code),
messageArrayBuffer
};
Local<Function>::New(isolate, *closePf)->Call(isolate->GetCurrentContext()->Global(), 3, argv);
/* Again, here we clear the buffer to avoid strange bugs */
messageArrayBuffer->Neuter();
}
}); });
// Return this /* Return this */
args.GetReturnValue().Set(args.Holder()); args.GetReturnValue().Set(args.Holder());
} }
// todo: all other methods // todo: all other methods, in particular post!
template <typename APP> template <typename APP>
void uWS_App_get(const FunctionCallbackInfo<Value> &args) { void uWS_App_get(const FunctionCallbackInfo<Value> &args) {
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0); APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
@ -330,6 +383,9 @@ void uWS_App(const FunctionCallbackInfo<Value> &args) {
appTemplate->InstanceTemplate()->SetInternalFieldCount(1); appTemplate->InstanceTemplate()->SetInternalFieldCount(1);
/* Most used functions will be get, post, ws, listen */
// Get and all the Http methods // Get and all the Http methods
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "get"), FunctionTemplate::New(isolate, uWS_App_get<APP>)); appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "get"), FunctionTemplate::New(isolate, uWS_App_get<APP>));