Wrap onAborted, hook up VideoStreamer

This commit is contained in:
Alex Hultman 2019-01-17 06:31:55 +01:00
parent 24fe602f50
commit 8e0860edcf
3 changed files with 54 additions and 40 deletions

View File

@ -4,7 +4,6 @@
const uWS = require('../dist/uws.js'); const uWS = require('../dist/uws.js');
const fs = require('fs'); const fs = require('fs');
const crypto = require('crypto');
const port = 9001; const port = 9001;
const fileName = '/home/alexhultman/Downloads/Sintel.2010.720p.mkv'; const fileName = '/home/alexhultman/Downloads/Sintel.2010.720p.mkv';
@ -17,56 +16,55 @@ function toArrayBuffer(buffer) {
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength); return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
} }
/* Returns true on success, false if it's having backpressure */ /* Helper function to pipe the ReadaleStream over an Http responses */
function tryStream(res, chunk, requestDataCb) { function pipeStreamOverResponse(res, readStream, totalSize) {
/* Stream as far as we can */ /* Careful! If Node.js would emit error before the first res.tryEnd, res will hang and never time out */
let lastOffset = res.getWriteOffset(); /* For this demo, I skipped checking for Node.js errors, you are free to PR fixes to this example */
if (!res.tryEnd(chunk, totalSize)) { readStream.on('data', (chunk) => {
/* Save unsent chunk for when we can send it */ /* We only take standard V8 units of data */
res.chunk = chunk; const ab = toArrayBuffer(chunk);
res.chunkOffset = lastOffset;
res.onWritable((offset) => { /* Stream as far as we can */
if (res.tryEnd(res.chunk.slice(offset - res.chunkOffset), totalSize)) { let lastOffset = res.getWriteOffset();
requestDataCb(); if (!res.tryEnd(ab, totalSize)) {
return true; /* If we could not send this chunk, pause */
} readStream.pause();
return false;
}); /* Save unsent chunk for when we can send it */
/* Return failure */ res.ab = ab;
return false; res.abOffset = lastOffset;
}
/* Return success */ /* Register async handlers for drainage */
return true; res.onWritable((offset) => {
if (res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)) {
readStream.resume();
return true;
}
return false;
});
/* Register async abortion handlers */
res.onAborted(() => {
console.log('Res is no longer valid!');
readStream.destroy();
});
}
}).on('end', () => {
/* Todo: handle errors of the stream, probably good to simply close the response */
});
} }
/* Yes, you can easily swap to SSL streaming by uncommenting here */ /* Yes, you can easily swap to SSL streaming by uncommenting here */
const app = uWS./*SSL*/App({ const app = uWS.SSLApp({
key_file_name: '/home/alexhultman/key.pem', key_file_name: '/home/alexhultman/key.pem',
cert_file_name: '/home/alexhultman/cert.pem', cert_file_name: '/home/alexhultman/cert.pem',
passphrase: '1234' passphrase: '1234'
}).get('/sintel.mkv', (res, req) => { }).get('/sintel.mkv', (res, req) => {
/* Log */ /* Log */
console.log("Streaming Sintel video..."); console.log("Streaming Sintel video...");
/* Create read stream with Node.js and start streaming over Http */ /* Create read stream with Node.js and start streaming over Http */
const readStream = fs.createReadStream(fileName); const readStream = fs.createReadStream(fileName);
const hash = crypto.createHash('md5'); pipeStreamOverResponse(res, readStream, totalSize);
readStream.on('data', (chunk) => {
const ab = toArrayBuffer(chunk);
hash.update(chunk);
if (!tryStream(res, ab, () => {
/* Called once we want more data */
readStream.resume();
})) {
/* If we could not send this chunk, pause */
readStream.pause();
}
}).on('end', () => {
console.log("md5: " + hash.digest('hex'));
});
}).get('/*', (res, req) => { }).get('/*', (res, req) => {
/* Make sure to always handle every route */ /* Make sure to always handle every route */
res.end('Nothing to see here!'); res.end('Nothing to see here!');

View File

@ -14,6 +14,21 @@ struct HttpResponseWrapper {
// res.onData(JS function) // res.onData(JS function)
// res.onAborted // res.onAborted
template <bool SSL>
static void res_onAborted(const FunctionCallbackInfo<Value> &args) {
/* This thing perfectly fits in with unique_function, and will Reset on destructor */
UniquePersistent<Function> p(isolate, Local<Function>::Cast(args[0]));
getHttpResponse<SSL>(args)->onAborted([p = std::move(p)]() {
HandleScope hs(isolate);
Local<Value> argv[] = {};
Local<Function>::New(isolate, p)->Call(isolate->GetCurrentContext()->Global(), 0, argv);
});
args.GetReturnValue().Set(args.Holder());
}
/* Returns the current write offset */ /* Returns the current write offset */
template <bool SSL> template <bool SSL>
static void res_getWriteOffset(const FunctionCallbackInfo<Value> &args) { static void res_getWriteOffset(const FunctionCallbackInfo<Value> &args) {
@ -108,6 +123,7 @@ struct HttpResponseWrapper {
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getWriteOffset"), FunctionTemplate::New(isolate, res_getWriteOffset<SSL>)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getWriteOffset"), FunctionTemplate::New(isolate, res_getWriteOffset<SSL>));
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onWritable"), FunctionTemplate::New(isolate, res_onWritable<SSL>)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onWritable"), FunctionTemplate::New(isolate, res_onWritable<SSL>));
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onAborted"), FunctionTemplate::New(isolate, res_onAborted<SSL>));
/* Create our template */ /* Create our template */
Local<Object> resObjectLocal = resTemplateLocal->GetFunction()->NewInstance(isolate->GetCurrentContext()).ToLocalChecked(); Local<Object> resObjectLocal = resTemplateLocal->GetFunction()->NewInstance(isolate->GetCurrentContext()).ToLocalChecked();

@ -1 +1 @@
Subproject commit 0ca061e7727df4fcc0c840e67188e093f966e7d3 Subproject commit 6718301ac9f0385f3146bab70c9fd7d8fa305562