From 8e0860edcf85f631cd7a7021ebeebb638b945568 Mon Sep 17 00:00:00 2001 From: Alex Hultman Date: Thu, 17 Jan 2019 06:31:55 +0100 Subject: [PATCH] Wrap onAborted, hook up VideoStreamer --- examples/VideoStreamer.js | 76 +++++++++++++++++++-------------------- src/HttpResponseWrapper.h | 16 +++++++++ uWebSockets | 2 +- 3 files changed, 54 insertions(+), 40 deletions(-) diff --git a/examples/VideoStreamer.js b/examples/VideoStreamer.js index e249609..6f8a00f 100644 --- a/examples/VideoStreamer.js +++ b/examples/VideoStreamer.js @@ -4,7 +4,6 @@ const uWS = require('../dist/uws.js'); const fs = require('fs'); -const crypto = require('crypto'); const port = 9001; const fileName = '/home/alexhultman/Downloads/Sintel.2010.720p.mkv'; @@ -17,56 +16,55 @@ function toArrayBuffer(buffer) { return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength); } -/* Returns true on success, false if it's having backpressure */ -function tryStream(res, chunk, requestDataCb) { - /* Stream as far as we can */ - let lastOffset = res.getWriteOffset(); - if (!res.tryEnd(chunk, totalSize)) { - /* Save unsent chunk for when we can send it */ - res.chunk = chunk; - res.chunkOffset = lastOffset; +/* Helper function to pipe the ReadaleStream over an Http responses */ +function pipeStreamOverResponse(res, readStream, totalSize) { + /* Careful! If Node.js would emit error before the first res.tryEnd, res will hang and never time out */ + /* For this demo, I skipped checking for Node.js errors, you are free to PR fixes to this example */ + readStream.on('data', (chunk) => { + /* We only take standard V8 units of data */ + const ab = toArrayBuffer(chunk); - res.onWritable((offset) => { - if (res.tryEnd(res.chunk.slice(offset - res.chunkOffset), totalSize)) { - requestDataCb(); - return true; - } - return false; - }); - /* Return failure */ - return false; - } - /* Return success */ - return true; + /* Stream as far as we can */ + let lastOffset = res.getWriteOffset(); + if (!res.tryEnd(ab, totalSize)) { + /* If we could not send this chunk, pause */ + readStream.pause(); + + /* Save unsent chunk for when we can send it */ + res.ab = ab; + res.abOffset = lastOffset; + + /* Register async handlers for drainage */ + res.onWritable((offset) => { + if (res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)) { + readStream.resume(); + return true; + } + return false; + }); + + /* Register async abortion handlers */ + res.onAborted(() => { + console.log('Res is no longer valid!'); + readStream.destroy(); + }); + } + }).on('end', () => { + /* Todo: handle errors of the stream, probably good to simply close the response */ + }); } /* Yes, you can easily swap to SSL streaming by uncommenting here */ -const app = uWS./*SSL*/App({ +const app = uWS.SSLApp({ key_file_name: '/home/alexhultman/key.pem', cert_file_name: '/home/alexhultman/cert.pem', passphrase: '1234' }).get('/sintel.mkv', (res, req) => { /* Log */ console.log("Streaming Sintel video..."); - /* Create read stream with Node.js and start streaming over Http */ const readStream = fs.createReadStream(fileName); - const hash = crypto.createHash('md5'); - readStream.on('data', (chunk) => { - - const ab = toArrayBuffer(chunk); - hash.update(chunk); - - if (!tryStream(res, ab, () => { - /* Called once we want more data */ - readStream.resume(); - })) { - /* If we could not send this chunk, pause */ - readStream.pause(); - } - }).on('end', () => { - console.log("md5: " + hash.digest('hex')); - }); + pipeStreamOverResponse(res, readStream, totalSize); }).get('/*', (res, req) => { /* Make sure to always handle every route */ res.end('Nothing to see here!'); diff --git a/src/HttpResponseWrapper.h b/src/HttpResponseWrapper.h index 8ddcc2d..ecd51c8 100644 --- a/src/HttpResponseWrapper.h +++ b/src/HttpResponseWrapper.h @@ -14,6 +14,21 @@ struct HttpResponseWrapper { // res.onData(JS function) // res.onAborted + template + static void res_onAborted(const FunctionCallbackInfo &args) { + /* This thing perfectly fits in with unique_function, and will Reset on destructor */ + UniquePersistent p(isolate, Local::Cast(args[0])); + + getHttpResponse(args)->onAborted([p = std::move(p)]() { + HandleScope hs(isolate); + + Local argv[] = {}; + Local::New(isolate, p)->Call(isolate->GetCurrentContext()->Global(), 0, argv); + }); + + args.GetReturnValue().Set(args.Holder()); + } + /* Returns the current write offset */ template static void res_getWriteOffset(const FunctionCallbackInfo &args) { @@ -108,6 +123,7 @@ struct HttpResponseWrapper { resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getWriteOffset"), FunctionTemplate::New(isolate, res_getWriteOffset)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onWritable"), FunctionTemplate::New(isolate, res_onWritable)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onAborted"), FunctionTemplate::New(isolate, res_onAborted)); /* Create our template */ Local resObjectLocal = resTemplateLocal->GetFunction()->NewInstance(isolate->GetCurrentContext()).ToLocalChecked(); diff --git a/uWebSockets b/uWebSockets index 0ca061e..6718301 160000 --- a/uWebSockets +++ b/uWebSockets @@ -1 +1 @@ -Subproject commit 0ca061e7727df4fcc0c840e67188e093f966e7d3 +Subproject commit 6718301ac9f0385f3146bab70c9fd7d8fa305562