Skip to content

Commit

Permalink
feat, perf: w3c-like File, file-iterator (#2414)
Browse files Browse the repository at this point in the history
* feat, perf: w3c-like File, file-iterator

* feat: w3c File fields

* fix: spec compliance, iterator event

* perf: lazy load iterator on web stream
feat: add iterator tests
breaking: deprecate getBuffer

* fix: reading when destroyed
  • Loading branch information
ThaUnknown authored Dec 11, 2022
1 parent b6c2a4e commit 69d85a8
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 266 deletions.
99 changes: 99 additions & 0 deletions lib/file-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 1,99 @@
import debugFactory from 'debug'
import EventEmitter from 'events'

const debug = debugFactory('webtorrent:file-iterator')

/**
* Async iterator of a torrent file
*
* @param {File} file
* @param {Object} opts
* @param {number} opts.start iterator slice of file, starting from this byte (inclusive)
* @param {number} opts.end iterator slice of file, ending with this byte (inclusive)
*/
export default class FileIterator extends EventEmitter {
constructor (file, { start, end }) {
super()

this._torrent = file._torrent

this._pieceLength = file._torrent.pieceLength

this._startPiece = (start file.offset) / this._pieceLength | 0
this._endPiece = (end file.offset) / this._pieceLength | 0

this._piece = this._startPiece
this._offset = (start file.offset) - (this._startPiece * this._pieceLength)

this._missing = end - start 1
this._criticalLength = Math.min((1024 * 1024 / this._pieceLength) | 0, 2)

this._torrent.select(this._startPiece, this._endPiece, true, () => {
this.emit('_notify')
})
this.destroyed = false
}

[Symbol.asyncIterator] () {
return this
}

next () {
return new Promise((resolve, reject) => {
if (this._missing === 0 || this.destroyed) {
resolve({ done: true })
return this.destroy()
}
const pump = (index, opts) => {
if (!this._torrent.bitfield.get(index)) {
this._torrent.critical(index, index this._criticalLength)
const listener = () => {
if (this._torrent.bitfield.get(index)) {
this.removeListener('_notify', listener)
pump(index, opts)
}
}
return this.on('_notify', listener)
}

if (this._torrent.destroyed) return reject(new Error('Torrent removed'))

this._torrent.store.get(index, opts, (err, buffer) => {
if (this.destroyed) return resolve({ done: true }) // prevent hanging
debug('read %s and yielding (length %s) (err %s)', index, buffer?.length, err?.message)

if (err) return reject(err)

// prevent re-wrapping outside of promise
resolve({ value: buffer, done: false })
})
}

const length = Math.min(this._missing, this._pieceLength) - this._offset

pump(this._piece , { length, offset: this._offset })
this._missing -= length
this._offset = 0
})
}

async return () {
this.destroy()
const { value } = await this.next()
return { done: true, value }
}

async throw (err) {
throw err
}

destroy (cb = () => {}, err) {
if (this.destroyed) return
this.destroyed = true
if (!this._torrent.destroyed) {
this._torrent.deselect(this._startPiece, this._endPiece, true)
}
this.emit('return')
cb(err)
}
}
101 changes: 0 additions & 101 deletions lib/file-stream.js

This file was deleted.

91 changes: 65 additions & 26 deletions lib/file.js
Original file line number Diff line number Diff line change
@@ -1,10 1,8 @@
import EventEmitter from 'events'
import { PassThrough } from 'streamx'
import { BlobWriteStream } from 'fast-blob-stream'
import streamToBuffer from 'stream-with-known-length-to-buffer'
import queueMicrotask from 'queue-microtask'
import { Readable } from 'streamx'
import { chunkStoreRead } from 'chunk-store-iterator'
import mime from 'mime/lite.js'
import FileStream from './file-stream.js'
import FileIterator from './file-iterator.js'

export default class File extends EventEmitter {
constructor (torrent, file) {
Expand All @@ -13,10 11,13 @@ export default class File extends EventEmitter {
this._torrent = torrent
this._destroyed = false
this._fileStreams = new Set()
this._iterators = new Set()

this.name = file.name
this.path = file.path
this.length = file.length
this.size = file.length
this.type = mime.getType(this.name) || 'application/octet-stream'
this.offset = file.offset

this.done = false
Expand Down Expand Up @@ -93,16 94,32 @@ export default class File extends EventEmitter {
this._torrent.deselect(this._startPiece, this._endPiece, false)
}

createReadStream (opts) {
if (this.length === 0) {
const empty = new PassThrough()
queueMicrotask(() => {
empty.end()
})
return empty
[Symbol.asyncIterator] (opts = {}) {
if (this.length === 0) return (async function * empty () {})()

const { start = 0 } = opts ?? {}
const end = (
(opts && opts.end && opts.end < this.length)
? opts.end
: this.length - 1
) this.offset

if (this.done) {
return chunkStoreRead(this._torrent.store, { offset: start, length: end - start 1 })
}

const fileStream = new FileStream(this, opts)
const iterator = new FileIterator(this, { start, end })
this._iterators.add(iterator)
iterator.once('return', () => {
this._iterators.delete(iterator)
})

return iterator
}

createReadStream (opts) {
const iterator = this[Symbol.asyncIterator](opts)
const fileStream = Readable.from(iterator)

this._fileStreams.add(fileStream)
fileStream.once('close', () => {
Expand All @@ -112,32 129,50 @@ export default class File extends EventEmitter {
return fileStream
}

getBuffer (cb) {
streamToBuffer(this.createReadStream(), this.length, cb)
async arrayBuffer (cb = () => {}) {
const data = new Uint8Array(this.length)
let offset = 0
for await (const chunk of this) {
data.set(chunk, offset)
offset = chunk.length
}
cb(null, data)
return data.buffer
}

getBlob (cb) {
if (typeof window === 'undefined') throw new Error('browser-only method')
const writeStream = new BlobWriteStream(blob => {
cb(null, blob)
}, { mimeType: mime.getType(this.name) || 'application/octet-stream' })
this.createReadStream().pipe(writeStream)
async blob () {
return new Blob(await this.arrayBuffer(), { mimeType: this.type })
}

getBlobURL (cb) {
this.getBlob((_err, blob) => {
cb(null, URL.createObjectURL(blob))
stream (opts) {
let iterator
return new ReadableStream({
start () {
iterator = this[Symbol.asyncIterator](opts)
},
async pull ({ close, enqueue }) {
const { value, done } = await iterator.next()

if (done) {
close()
} else {
enqueue(value)
}
},
cancel () {
iterator.return()
}
})
}

getStreamURL () {
get streamURL () {
if (!this._client._server) throw new Error('No server created')
const url = `${this._client._server.pathname}/${this._torrent.infoHash}/${encodeURI(this.path)}`
return url
}

streamTo (elem) {
elem.src = this.getStreamURL()
elem.src = this.streamURL
return elem
}

Expand All @@ -153,5 188,9 @@ export default class File extends EventEmitter {
fileStream.destroy()
}
this._fileStreams.clear()
for (const iterator of this._iterators) {
iterator.destroy()
}
this._iterators.clear()
}
}
19 changes: 13 additions & 6 deletions lib/server.js
Original file line number Diff line number Diff line change
@@ -1,6 1,5 @@
import http from 'http'
import escapeHtml from 'escape-html'
import mime from 'mime/lite.js'
import pump from 'pump'
import rangeParser from 'range-parser'
import queueMicrotask from 'queue-microtask'
Expand Down Expand Up @@ -116,7 115,7 @@ class ServerBase {
res.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate, max-age=0'
// Support range-requests
res.headers['Accept-Ranges'] = 'bytes'
res.headers['Content-Type'] = mime.getType(file.name) || 'application/octet-stream'
res.headers['Content-Type'] = file.type
// Support DLNA streaming
res.headers['transferMode.dlna.org'] = 'Streaming'
res.headers['contentFeatures.dlna.org'] = 'DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000'
Expand Down Expand Up @@ -150,15 149,23 @@ class ServerBase {
res.headers['Content-Length'] = file.length
}

const stream = req.method === 'GET' && file.createReadStream(range)
if (req.method === 'GET') {
const iterator = file[Symbol.asyncIterator](range)
let transform = null
file.emit('iterator', { iterator, req, file }, target => {
transform = target
})

let pipe = null
if (stream) {
const stream = Readable.from(transform || iterator)
let pipe = null
file.emit('stream', { stream, req, file }, target => {
pipe = pump(stream, target)
})

res.body = pipe || stream
} else {
res.body = false
}
res.body = pipe || stream
return res
}

Expand Down
Loading

0 comments on commit 69d85a8

Please sign in to comment.