diff --git a/examples/VideoStreamer.js b/examples/VideoStreamer.js index 0424c9c..fd3e3c9 100644 --- a/examples/VideoStreamer.js +++ b/examples/VideoStreamer.js @@ -6,9 +6,12 @@ const uWS = require('../dist/uws.js'); const fs = require('fs'); const port = 9001; -const fileName = '/home/alexhultman/Downloads/Sintel.2010.720p.mkv'; +const fileName = 'C:\\Users\\Alex\\Downloads\\Sintel.2010.720p.mkv'; const totalSize = fs.statSync(fileName).size; +let openStreams = 0; +let streamIndex = 0; + console.log('Video size is: ' + totalSize + ' bytes'); /* Helper function converting Node.js buffer to ArrayBuffer */ @@ -16,6 +19,21 @@ function toArrayBuffer(buffer) { return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength); } +/* Either onAborted or simply finished request */ +function onAbortedOrFinishedResponse(res, readStream) { + + if (res.id == -1) { + console.log("ERROR! onAbortedOrFinishedResponse called twice for the same res!"); + } else { + console.log('Stream was closed, openStreams: ' + --openStreams); + console.timeEnd(res.id); + readStream.destroy(); + } + + /* Mark this response already accounted for */ + res.id = -1; +} + /* Helper function to pipe the ReadaleStream over an Http responses */ function pipeStreamOverResponse(res, readStream, totalSize) { /* Careful! If Node.js would emit error before the first res.tryEnd, res will hang and never time out */ @@ -24,9 +42,16 @@ function pipeStreamOverResponse(res, readStream, totalSize) { /* We only take standard V8 units of data */ const ab = toArrayBuffer(chunk); - /* Stream as far as we can */ + /* Store where we are, globally, in our response */ let lastOffset = res.getWriteOffset(); - if (!res.tryEnd(ab, totalSize)) { + + /* Streaming a chunk returns whether that chunk was sent, and if that chunk was last */ + let [ok, done] = res.tryEnd(ab, totalSize); + + /* Did we successfully send last chunk? */ + if (done) { + onAbortedOrFinishedResponse(res, readStream); + } else if (!ok) { /* If we could not send this chunk, pause */ readStream.pause(); @@ -36,32 +61,45 @@ function pipeStreamOverResponse(res, readStream, totalSize) { /* Register async handlers for drainage */ res.onWritable((offset) => { - if (res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)) { + /* Here the timeout is off, we can spend as much time before calling tryEnd we want to */ + + /* On failure the timeout will start */ + let [ok, done] = res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize); + if (done) { + onAbortedOrFinishedResponse(res, readStream); + } else if (ok) { + /* We sent a chunk and it was not the last one, so let's resume reading. + * Timeout is still disabled, so we can spend any amount of time waiting + * for more chunks to send. */ readStream.resume(); - return true; } - return false; + + /* We always have to return true/false in onWritable. + * If you did not send anything, return true for success. */ + return ok; }); } - }).on('end', () => { + + }).on('error', () => { /* Todo: handle errors of the stream, probably good to simply close the response */ + console.log('Unhandled read error from Node.js, you need to handle this!'); }); /* If you plan to asyncronously respond later on, you MUST listen to onAborted BEFORE returning */ res.onAborted(() => { - console.log('Res is no longer valid!'); - readStream.destroy(); + onAbortedOrFinishedResponse(res, readStream); }); } /* Yes, you can easily swap to SSL streaming by uncommenting here */ -const app = uWS.SSLApp({ +const app = uWS./*SSL*/App({ key_file_name: 'misc/key.pem', cert_file_name: 'misc/cert.pem', passphrase: '1234' }).get('/sintel.mkv', (res, req) => { /* Log */ - console.log("Streaming Sintel video..."); + console.time(res.id = ++streamIndex); + console.log('Stream was opened, openStreams: ' + ++openStreams); /* Create read stream with Node.js and start streaming over Http */ const readStream = fs.createReadStream(fileName); pipeStreamOverResponse(res, readStream, totalSize); diff --git a/src/HttpResponseWrapper.h b/src/HttpResponseWrapper.h index 32e8545..d2d3888 100644 --- a/src/HttpResponseWrapper.h +++ b/src/HttpResponseWrapper.h @@ -160,7 +160,12 @@ struct HttpResponseWrapper { invalidateResObject(args); } - args.GetReturnValue().Set(Boolean::New(isolate, ok)); + /* This is a quick fix, it will need updating in µWS later on */ + Local array = Array::New(isolate, 2); + array->Set(0, Boolean::New(isolate, ok)); + array->Set(1, Boolean::New(isolate, res->hasResponded())); + + args.GetReturnValue().Set(array); } } diff --git a/uWebSockets b/uWebSockets index eaa9467..5528e6f 160000 --- a/uWebSockets +++ b/uWebSockets @@ -1 +1 @@ -Subproject commit eaa9467d1f680787f0f5c84ee4bf4a8e68a6ed70 +Subproject commit 5528e6f5fec2ffbd8e684e5d7d966eb0ae2bd5d5