From a12354e7cd87ed85ab2a0b2e201e25747119a9d4 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 15:54:25 -0300 Subject: [PATCH 01/28] Update: add hyperdrive client import/export tests --- test/hyperdrive.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 25327ad..8f7fdb5 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,3 +1,7 @@ +const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { tmpdir } = require('os') +const { once } = require('events') +const { join } = require('path') const test = require('tape') const collectStream = require('stream-collector') @@ -753,6 +757,67 @@ test('can get all network configurations', async t => { t.end() }) +test.only('import', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'import-test')) + await writeFile(join(tmpDir, 'test.txt'), 'hello world') + + try { + // now import the dir + const drive = await client.drive.get() + const importProgress = client.drive.import(tmpDir, drive) + + const [, dst] = await once(importProgress, 'put-end') + t.same(dst.name, '/test.txt') + const contents = await drive.readFile('test.txt', { encoding: 'utf8' }) + t.same(contents, 'hello world') + + await drive.close() + + importProgress.destroy() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('export', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'export-test')) + + try { + // create a test drive + const drive = await client.drive.get() + await drive.writeFile('export.txt', 'hello world') + + const dw = client.drive.export(drive, tmpDir) + await dw.start() + + let total, downloaded + dw.on('stats', async stats => { + total = stats.total + downloaded = stats.downloaded + console.log({ total, downloaded }) + if (total === downloaded) { + t.pass('stats OK') + const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + t.same(contents, 'hello world') + await drive.close() + await cleanup() + t.end() + } + }) + } catch (err) { + t.fail(err) + } +}) + // TODO: Figure out why the grpc server is not terminating. test.onFinish(() => { setTimeout(() => { From 597e2867a2e58ba6f0361ec427b558b83b44e5b0 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 16:22:33 -0300 Subject: [PATCH 02/28] Update: remove tape only call --- test/hyperdrive.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 8f7fdb5..167a6c9 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -757,7 +757,7 @@ test('can get all network configurations', async t => { t.end() }) -test.only('import', async t => { +test('import', async t => { const { client, cleanup } = await createOne() // create a tmp dir From 259d9e433462b647dcfc5cdf9e76bb08365da752 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Fri, 15 May 2020 20:37:19 -0300 Subject: [PATCH 03/28] Update: tweak export test --- test/hyperdrive.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 167a6c9..0d6137b 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,4 +1,4 @@ -const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { promises: { mkdtemp, writeFile, readFile } } = require('fs') const { tmpdir } = require('os') const { once } = require('events') const { join } = require('path') @@ -803,10 +803,9 @@ test('export', async t => { dw.on('stats', async stats => { total = stats.total downloaded = stats.downloaded - console.log({ total, downloaded }) if (total === downloaded) { t.pass('stats OK') - const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + const contents = await readFile(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) t.same(contents, 'hello world') await drive.close() await cleanup() From 4dae5d1a251508ad700d14499c0af079f8f00c8f Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 15:54:25 -0300 Subject: [PATCH 04/28] Update: add hyperdrive client import/export tests --- test/hyperdrive.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 2361195..a130b05 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,3 +1,7 @@ +const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { tmpdir } = require('os') +const { once } = require('events') +const { join } = require('path') const test = require('tape') const collectStream = require('stream-collector') @@ -878,6 +882,67 @@ test('can get all network configurations', async t => { t.end() }) +test.only('import', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'import-test')) + await writeFile(join(tmpDir, 'test.txt'), 'hello world') + + try { + // now import the dir + const drive = await client.drive.get() + const importProgress = client.drive.import(tmpDir, drive) + + const [, dst] = await once(importProgress, 'put-end') + t.same(dst.name, '/test.txt') + const contents = await drive.readFile('test.txt', { encoding: 'utf8' }) + t.same(contents, 'hello world') + + await drive.close() + + importProgress.destroy() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('export', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'export-test')) + + try { + // create a test drive + const drive = await client.drive.get() + await drive.writeFile('export.txt', 'hello world') + + const dw = client.drive.export(drive, tmpDir) + await dw.start() + + let total, downloaded + dw.on('stats', async stats => { + total = stats.total + downloaded = stats.downloaded + console.log({ total, downloaded }) + if (total === downloaded) { + t.pass('stats OK') + const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + t.same(contents, 'hello world') + await drive.close() + await cleanup() + t.end() + } + }) + } catch (err) { + t.fail(err) + } +}) + // TODO: Figure out why the grpc server is not terminating. test.onFinish(() => { setTimeout(() => { From 51a5e288e4a3a852902579a260224f1a71921e14 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 16:22:33 -0300 Subject: [PATCH 05/28] Update: remove tape only call --- test/hyperdrive.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index a130b05..51f7997 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -882,7 +882,7 @@ test('can get all network configurations', async t => { t.end() }) -test.only('import', async t => { +test('import', async t => { const { client, cleanup } = await createOne() // create a tmp dir From 57332ca6d72658fd17c0a77400b0a233199a9ae5 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Fri, 15 May 2020 20:37:19 -0300 Subject: [PATCH 06/28] Update: tweak export test --- test/hyperdrive.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 51f7997..2d2abcb 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,4 +1,4 @@ -const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { promises: { mkdtemp, writeFile, readFile } } = require('fs') const { tmpdir } = require('os') const { once } = require('events') const { join } = require('path') @@ -928,10 +928,9 @@ test('export', async t => { dw.on('stats', async stats => { total = stats.total downloaded = stats.downloaded - console.log({ total, downloaded }) if (total === downloaded) { t.pass('stats OK') - const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + const contents = await readFile(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) t.same(contents, 'hello world') await drive.close() await cleanup() From 532d09a1877ad0147dfc7bd6bd820905580cab18 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Sat, 6 Jun 2020 20:23:53 +0200 Subject: [PATCH 07/28] Undo createReadStream end option fix + add test --- lib/drives/index.js | 2 +- test/hyperdrive.js | 58 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/lib/drives/index.js b/lib/drives/index.js index 1370254..1b38ac8 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -637,7 +637,7 @@ class DriveManager extends EventEmitter { const drive = this.driveForSession(id) const streamOpts = {} - if (end > 0 || (end === 0 && start === 0)) streamOpts.end = end + if (end !== 0) streamOpts.end = end if (length !== 0) streamOpts.length = length streamOpts.start = start diff --git a/test/hyperdrive.js b/test/hyperdrive.js index f888ac9..3c57c9e 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -182,6 +182,64 @@ test('can write/read a file from a remote hyperdrive using stream methods', asyn t.end() }) +test('assorted read parameters to createReadStream', async t => { + const { client, cleanup } = await createOne() + + try { + const drive = await client.drive.get() + t.true(drive.key) + t.same(drive.id, 1) + + let blocks = ['hello', 'hello', 'world', 'world'] + let complete = blocks.join('') + let tests = [ + { + params: {}, + value: complete + }, + { + params: { end: 10 }, + value: complete.slice(0, 10 + 1) + }, + { + params: { start: 4, end: 10 }, + value: complete.slice(4, 10 + 1) + } + ] + + const writeStream = drive.createWriteStream('hello', { uid: 999, gid: 999 }) + for (let block of blocks) { + writeStream.write(block) + } + writeStream.end() + + await new Promise((resolve, reject) => { + writeStream.on('error', reject) + writeStream.on('finish', resolve) + }) + + console.log('wrote blocks') + + for (let { params, value } of tests) { + const readStream = await drive.createReadStream('hello', params) + const content = await new Promise((resolve, reject) => { + collectStream(readStream, (err, bufs) => { + if (err) return reject(err) + return resolve(Buffer.concat(bufs)) + }) + }) + t.same(content.toString('utf8'), value) + } + + await drive.close() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + test('reading an invalid file propogates error', async t => { const { client, cleanup } = await createOne() From 9b3b98c2f37258ce15b94f003cbe69e0125e9de2 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Sat, 6 Jun 2020 20:24:32 +0200 Subject: [PATCH 08/28] 1.13.18 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0328db2..486b931 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperdrive-daemon", - "version": "1.13.17", + "version": "1.13.18", "description": "A FUSE-mountable distributed filesystem, built on Hyperdrive", "main": "index.js", "bin": { From 9e9925636fe79500e454fa31df8b17850594a504 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Sun, 7 Jun 2020 00:03:03 +0200 Subject: [PATCH 09/28] Extend createReadStream tests + cleanup --- test/hyperdrive.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 3c57c9e..32a9283 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -204,6 +204,14 @@ test('assorted read parameters to createReadStream', async t => { { params: { start: 4, end: 10 }, value: complete.slice(4, 10 + 1) + }, + { + params: { end: complete.length - 1 }, + value: complete + }, + { + params: { start: 5, length: 5 }, + value: complete.slice(5, 10) } ] @@ -218,8 +226,6 @@ test('assorted read parameters to createReadStream', async t => { writeStream.on('finish', resolve) }) - console.log('wrote blocks') - for (let { params, value } of tests) { const readStream = await drive.createReadStream('hello', params) const content = await new Promise((resolve, reject) => { From a5ff42f2c36fc2a59e7da02324a93f4346a16422 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Wed, 20 May 2020 12:48:36 +0200 Subject: [PATCH 10/28] Add mirroring --- lib/drives/index.js | 90 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/lib/drives/index.js b/lib/drives/index.js index 1b38ac8..c74b04b 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -14,6 +14,7 @@ const { fromStat, fromMount, fromMetadata, + fromDriveConfiguration, fromNetworkConfiguration, toHyperdriveOptions, toStat, @@ -47,6 +48,7 @@ class DriveManager extends EventEmitter { this._driveIndex = sub(this.db, 'drives', { valueEncoding: bjson }) this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) this._namespaceIndex = sub(this.db, 'namespaces', { valueEncoding: 'utf8' }) + this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) this._drives = new Map() this._checkouts = new Map() @@ -56,6 +58,8 @@ class DriveManager extends EventEmitter { this._configuredMounts = new Set() this._sessions = new ArrayIndex() this._downloads = new ArrayIndex() + this._mirrorRanges = new Map() + this._mirrorSessions = new Map() this._watchCount = 0 this._readyPromise = null @@ -63,7 +67,8 @@ class DriveManager extends EventEmitter { this.ready = () => { if (this._readyPromise) return this._readyPromise this._readyPromise = Promise.all([ - this._rejoin() + this._rejoin(), + this._remirror() ]) return this._readyPromise } @@ -79,6 +84,14 @@ class DriveManager extends EventEmitter { } } + async _remirror () { + const mirrorList = await collect(this._mirrorIndex) + for (const { key } of mirrorList) { + const drive = await this.get({ key }) + await this.configureDrive(drive, { sparse: false }) + } + } + _generateKeyString (key, opts) { var keyString = (key instanceof Buffer) ? key.toString('hex') : key if (opts && opts.version) keyString = keyString + '+' + opts.version @@ -109,6 +122,60 @@ class DriveManager extends EventEmitter { return namespace } + async _startMirroring (drive) { + const driveKey = drive.key.toString('hex') + const mounts = await new Promise((resolve, reject) => { + drive.getAllMounts({ memory: false, recursive: true }, (err, mounts) => { + if (err) return reject(err) + return resolve(mounts) + }) + }) + + // A mirrored drive should never be closed. + const { session: mirrorSession } = await this.createSession(drive) + this._mirrorSessions.set(driveKey, mirrorSession) + + for (const [path, { metadata, content }] of mounts) { + const metadataKey = metadata.key.toString('hex') + const contentKey = content.key.toString('hex') + if (!this._mirrorRanges.has(metadataKey)) { + const range = metadata.download() + this._mirrorRanges.set(metadataKey, { core: metadata, range }) + } + if (!this._mirrorRanges.has(contentKey)) { + const range = content.download() + this._mirrorRanges.set(contentKey, { core: content, range }) + } + } + } + + async _stopMirroring (drive) { + const driveKey = drive.key.toString('hex') + const mounts = await new Promise((resolve, reject) => { + drive.getAllMounts({ memory: false, recursive: true }, (err, mounts) => { + if (err) return reject(err) + return resolve(mounts) + }) + }) + for (const [path, { metadata, content }] of mounts) { + const metadataKey = metadata.key.toString('hex') + const contentKey = content.key.toString('hex') + if (this._mirrorRanges.has(metadataKey)) { + const { core, range } = this._mirrorRanges.get(metadataKey) + core.undownload(range) + this._mirrorRanges.delete(metadataKey) + } + if (this._mirrorRanges.has(contentKey)) { + const { core, range } = this._mirrorRanges.get(contentKey) + core.undownload(range) + this._mirrorRanges.delete(contentKey) + } + } + if (this._mirrorSessions.has(driveKey)) { + await this.closeSession(this._mirrorSessions.get(driveKey)) + } + } + driveForSession (sessionId) { const drive = this._sessions.get(sessionId) if (!drive) throw new Error('Session does not exist.') @@ -388,6 +455,14 @@ class DriveManager extends EventEmitter { return checkout || drive } + async configureDrive (drive, opts = {}) { + if (opts.sparse === false) { + await this._startMirroring(drive) + } else { + await this._stopMirroring(drive) + } + } + async configureNetwork (feed, opts = {}) { const self = this const encodedKey = datEncoding.encode(feed.discoveryKey) @@ -532,6 +607,19 @@ class DriveManager extends EventEmitter { return rsp }, + configure: async (call) => { + const id = call.request.getId() + + if (!id) throw new Error('A configuration request must specify a session ID.') + const drive = this.driveForSession(id) + const opts = fromDriveConfiguration(call.request.getConfig()) + + await this.configureDrive(drive, { ...opts }) + + const rsp = new rpc.drive.messages.ConfigureDriveResponse() + return rsp + }, + configureNetwork: async (call) => { const id = call.request.getId() From 1c7f118ddec7d5af936cc7f29897c334f57d6a37 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Thu, 28 May 2020 19:14:37 +0200 Subject: [PATCH 11/28] Refactoring RPC method handling --- index.js | 92 +-- lib/common.js | 14 + lib/cores/index.js | 96 +++ lib/drives/index.js | 1213 +++++++++++++++++--------------------- lib/peers/index.js | 10 +- lib/peersockets/index.js | 6 - package.json | 2 +- 7 files changed, 714 insertions(+), 719 deletions(-) create mode 100644 lib/cores/index.js diff --git a/index.js b/index.js index 6b6d421..77e16a0 100644 --- a/index.js +++ b/index.js @@ -22,6 +22,7 @@ const PeersManager = require('./lib/peers') const DebugManager = require('./lib/debug') const FuseManager = require('./lib/fuse') const { serverError } = require('./lib/errors') +const { getHandlers } = require('./lib/common') const log = require('./lib/log').child({ component: 'server' }) @@ -261,51 +262,52 @@ class HyperdriveDaemon extends EventEmitter { }) } - createMainHandlers () { - return { - status: async call => { - const rsp = new rpc.main.messages.StatusResponse() - rsp.setApiversion(apiVersion) - rsp.setUptime(Date.now() - this._startTime) - if (this._versions) { - rsp.setDaemonversion(this._versions.daemon) - rsp.setClientversion(this._versions.client) - rsp.setSchemaversion(this._versions.schema) - rsp.setHyperdriveversion(this._versions.hyperdrive) - rsp.setNoisekey(this.noiseKeyPair.publicKey) - - const swarm = this.networking && this.networking.swarm - if (swarm) { - const remoteAddress = swarm.remoteAddress() - rsp.setHolepunchable(swarm.holepunchable()) - rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '') - } + // RPC Methods + + async _rpcStatus (call) { + const rsp = new rpc.main.messages.StatusResponse() + rsp.setApiversion(apiVersion) + rsp.setUptime(Date.now() - this._startTime) + if (this._versions) { + rsp.setDaemonversion(this._versions.daemon) + rsp.setClientversion(this._versions.client) + rsp.setSchemaversion(this._versions.schema) + rsp.setHyperdriveversion(this._versions.hyperdrive) + rsp.setNoisekey(this.noiseKeyPair.publicKey) + + const swarm = this.networking && this.networking.swarm + if (swarm) { + const remoteAddress = swarm.remoteAddress() + rsp.setHolepunchable(swarm.holepunchable()) + rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '') + } - if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative) - if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse) + if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative) + if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse) - if (hyperfuse) { - rsp.setFuseavailable(true) - rsp.setFuseconfigured(this.fuse.fuseConfigured) - } else { - rsp.setFuseavailable(false) - rsp.setFuseconfigured(false) - } - } - return rsp - }, - refreshFuse: async call => { - await this.fuse.ready() - if (this.fuse && this.fuse.fuseConfigured) { - hyperfuse = require('hyperdrive-fuse') - this._versions.fuseNative = require('fuse-native/package.json').version - this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version - } - return new rpc.main.messages.FuseRefreshResponse() + if (hyperfuse) { + rsp.setFuseavailable(true) + rsp.setFuseconfigured(this.fuse.fuseConfigured) + } else { + rsp.setFuseavailable(false) + rsp.setFuseconfigured(false) } } + return rsp } + async _rpcRefreshFuse (call) { + await this.fuse.ready() + if (this.fuse && this.fuse.fuseConfigured) { + hyperfuse = require('hyperdrive-fuse') + this._versions.fuseNative = require('fuse-native/package.json').version + this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version + } + return new rpc.main.messages.FuseRefreshResponse() + } + + // Public Methods + get uptime () { if (!this._startTime) return 0 return Date.now() - this._startTime @@ -370,24 +372,24 @@ class HyperdriveDaemon extends EventEmitter { this.server = new grpc.Server() this.server.addService(rpc.fuse.services.FuseService, { - ...wrap(this.metadata, this.fuse.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.fuse), { authenticate: true }) }) this.server.addService(rpc.drive.services.DriveService, { - ...wrap(this.metadata, this.drives.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.drives), { authenticate: true }) }) this.server.addService(rpc.peersockets.services.PeersocketsService, { - ...wrap(this.metadata, this.peersockets.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.peersockets), { authenticate: true }) }) this.server.addService(rpc.peers.services.PeersService, { - ...wrap(this.metadata, this.peers.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.peers), { authenticate: true }) }) if (this.debug) { this.server.addService(rpc.debug.services.DebugService, { - ...wrap(this.metadata, this.debug.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.debug), { authenticate: true }) }) } this.server.addService(rpc.main.services.HyperdriveService, { - ...wrap(this.metadata, this.createMainHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this), { authenticate: true }) }) await new Promise((resolve, reject) => { diff --git a/lib/common.js b/lib/common.js index e69de29..18bae12 100644 --- a/lib/common.js +++ b/lib/common.js @@ -0,0 +1,14 @@ +function getHandlers (manager) { + const handlers = {} + const rpcMethods = Object.getOwnPropertyNames(manager.__proto__).filter(methodName => methodName.startsWith('_rpc')) + for (let methodName of rpcMethods) { + let rpcMethodName = methodName.slice(4) + rpcMethodName = rpcMethodName.charCodeAt(0).toLowerCase() + rpcMethodName.slice(1) + handlers[rpcMethodName] = manager[methodName].bind(manager) + } + return handlers +} + +module.exports = { + getHandlers +} diff --git a/lib/cores/index.js b/lib/cores/index.js new file mode 100644 index 0000000..6161a1c --- /dev/null +++ b/lib/cores/index.js @@ -0,0 +1,96 @@ +const { EventEmitter } = require('events') +const sub = require('subleveldown') +const bjson = require('buffer-json-encoding') +const datEncoding = require('dat-encoding') + +const log = require('../log').child({ component: 'core-manager' }) + +class CoreManager extends EventEmitter { + constructor (corestore, networking, db, opts = {}) { + super() + + this.corestore = corestore + this.networking = networking + this.db = db + + this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) + this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) + } + + async _rejoin () { + if (this.noAnnounce) return + const seedList = await collect(this._seedIndex) + for (const { key: discoveryKey, value: networkOpts } of seedList) { + const opts = networkOpts && networkOpts.opts + if (!opts || !opts.announce) continue + this.networking.join(discoveryKey, { ...networkOpts.opts, loadForLength: true }) + } + } + + async configureNetwork (feed, opts = {}) { + const self = this + const encodedKey = datEncoding.encode(feed.discoveryKey) + const networkOpts = { + lookup: !!opts.lookup, + announce: !!opts.announce, + remember: !!opts.remember + } + const seeding = opts.lookup || opts.announce + var networkingPromise + + const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) + // If all the networking options are the same, exit early. + if (sameConfig) return + + const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } + if (opts.remember) { + if (seeding) await this._seedIndex.put(encodedKey, networkConfig) + else await this._seedIndex.del(encodedKey) + } else { + this._transientSeedIndex.set(encodedKey, networkConfig) + } + + // Failsafe + if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + + try { + if (seeding) { + networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) + } else { + networkingPromise = this.networking.leave(feed.discoveryKey) + } + networkingPromise.then(configurationSuccess) + networkingPromise.catch(configurationError) + } catch (err) { + configurationError(err) + } + + function sameNetworkConfig (discoveryKey, opts = {}) { + const swarmStatus = self.networking.status(discoveryKey) + if (!swarmStatus) return opts.lookup === false && opts.announce === false + return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup + } + + function configurationError (err) { + log.error({ err, discoveryKey: encodedKey }, 'network configuration error') + } + + function configurationSuccess () { + log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') + } + } + + async getNetworkConfiguration (drive) { + const encodedKey = datEncoding.encode(drive.discoveryKey) + const networkOpts = this._transientSeedIndex.get(encodedKey) + if (networkOpts) return networkOpts.opts + try { + const persistentOpts = await this._seedIndex.get(encodedKey) + return persistentOpts.opts + } catch (err) { + return null + } + } +} + +module.exports = CoreManager diff --git a/lib/drives/index.js b/lib/drives/index.js index c74b04b..55bc42f 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -1,7 +1,7 @@ -const crypto = require('crypto') const { EventEmitter } = require('events') const hyperdrive = require('hyperdrive') +const hypercoreCrypto = require('hypercore-crypto') const collectStream = require('stream-collector') const sub = require('subleveldown') const bjson = require('buffer-json-encoding') @@ -27,17 +27,17 @@ const { toChunks } = require('hyperdrive-daemon-client/lib/common') const { rpc } = require('hyperdrive-daemon-client') -const ArrayIndex = require('./array-index.js') +const ArrayIndex = require('./array-index.js') const log = require('../log').child({ component: 'drive-manager' }) const TRIE_UPDATER_SYMBOL = Symbol('hyperdrive-daemon-trie-updater') class DriveManager extends EventEmitter { - constructor (corestore, networking, db, opts = {}) { + constructor (cores, networking, db, opts = {}) { super() - this.corestore = corestore + this.cores = cores this.networking = networking this.db = db this.opts = opts @@ -47,7 +47,6 @@ class DriveManager extends EventEmitter { this._driveIndex = sub(this.db, 'drives', { valueEncoding: bjson }) this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) - this._namespaceIndex = sub(this.db, 'namespaces', { valueEncoding: 'utf8' }) this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) this._drives = new Map() @@ -63,11 +62,9 @@ class DriveManager extends EventEmitter { this._watchCount = 0 this._readyPromise = null - this.ready = () => { if (this._readyPromise) return this._readyPromise this._readyPromise = Promise.all([ - this._rejoin(), this._remirror() ]) return this._readyPromise @@ -99,29 +96,6 @@ class DriveManager extends EventEmitter { return keyString } - async _getNamespace (keyString) { - if (!keyString) return null - try { - const namespace = await this._namespaceIndex.get('by-drive/' + keyString) - return namespace - } catch (err) { - if (!err.notFound) throw err - return null - } - } - - async _createNamespace (keyString) { - const namespace = crypto.randomBytes(32).toString('hex') - try { - var existing = await this._namespaceIndex.get('by-namespace/' + namespace) - } catch (err) { - if (!err.notFound) throw err - existing = null - } - if (existing) return this._createNamespace(keyString) - return namespace - } - async _startMirroring (drive) { const driveKey = drive.key.toString('hex') const mounts = await new Promise((resolve, reject) => { @@ -135,7 +109,7 @@ class DriveManager extends EventEmitter { const { session: mirrorSession } = await this.createSession(drive) this._mirrorSessions.set(driveKey, mirrorSession) - for (const [path, { metadata, content }] of mounts) { + for (const [, { metadata, content }] of mounts) { const metadataKey = metadata.key.toString('hex') const contentKey = content.key.toString('hex') if (!this._mirrorRanges.has(metadataKey)) { @@ -157,7 +131,7 @@ class DriveManager extends EventEmitter { return resolve(mounts) }) }) - for (const [path, { metadata, content }] of mounts) { + for (const [, { metadata, content }] of mounts) { const metadataKey = metadata.key.toString('hex') const contentKey = content.key.toString('hex') if (this._mirrorRanges.has(metadataKey)) { @@ -335,11 +309,10 @@ class DriveManager extends EventEmitter { var unlisteners = [] if (!drive) { - var namespace = await this._getNamespace(keyString) - if (!namespace) namespace = await this._createNamespace(keyString) - drive = hyperdrive(this.corestore, key, { - ...driveOpts, - namespace + const randomNamespace = hypercoreCrypto.randomBytes(32).toString('hex') + drive = hyperdrive(this.cores.corestore, key, { + namespace: randomNamespace, + ...driveOpts }) const errorListener = err => log.error(err) @@ -380,13 +353,6 @@ class DriveManager extends EventEmitter { key = datEncoding.encode(drive.key) keyString = this._generateKeyString(key, opts) - if (namespace) { - await this._namespaceIndex.batch([ - { type: 'put', key: 'by-namespace/' + namespace, value: keyString }, - { type: 'put', key: 'by-drive/' + keyString, value: namespace } - ]) - } - var initialConfig // TODO: Need to fully work through all the default networking behaviors. if (opts.fuseNetwork) { @@ -463,737 +429,668 @@ class DriveManager extends EventEmitter { } } - async configureNetwork (feed, opts = {}) { - const self = this - const encodedKey = datEncoding.encode(feed.discoveryKey) - const networkOpts = { - lookup: !!opts.lookup, - announce: !!opts.announce, - remember: !!opts.remember - } - const seeding = opts.lookup || opts.announce - var networkingPromise + download (drive, path) { + const dl = drive.download(path) + return this._downloads.insert(dl) + } - const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) - // If all the networking options are the same, exit early. - if (sameConfig) return + // RPC Methods + async _rpcVersion (call) { + const id = call.request.getId() - const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } - if (opts.remember) { - if (seeding) await this._seedIndex.put(encodedKey, networkConfig) - else await this._seedIndex.del(encodedKey) - } else { - this._transientSeedIndex.set(encodedKey, networkConfig) - } + if (!id) throw new Error('A version request must specify a session ID.') + const drive = this.driveForSession(id) - // Failsafe - if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + const rsp = new rpc.drive.messages.DriveVersionResponse() + rsp.setVersion(drive.version) - try { - if (seeding) { - networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) - } else { - networkingPromise = this.networking.leave(feed.discoveryKey) - } - networkingPromise.then(configurationSuccess) - networkingPromise.catch(configurationError) - } catch (err) { - configurationError(err) - } + return rsp + } - function sameNetworkConfig (discoveryKey, opts = {}) { - const swarmStatus = self.networking.status(discoveryKey) - if (!swarmStatus) return opts.lookup === false && opts.announce === false - return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup - } + async _rpcGet (call) { + var driveOpts = fromHyperdriveOptions(call.request.getOpts()) - function configurationError (err) { - log.error({ err, discoveryKey: encodedKey }, 'network configuration error') - } + const { drive, session } = await this.createSession(null, driveOpts.key, driveOpts) + driveOpts.key = drive.key + driveOpts.discoveryKey = drive.discoveryKey + driveOpts.version = drive.version + driveOpts.writable = drive.writable - function configurationSuccess () { - log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') - } - } + const rsp = new rpc.drive.messages.GetDriveResponse() + rsp.setId(session) + rsp.setOpts(toHyperdriveOptions(driveOpts)) - async getNetworkConfiguration (drive) { - const encodedKey = datEncoding.encode(drive.discoveryKey) - const networkOpts = this._transientSeedIndex.get(encodedKey) - if (networkOpts) return networkOpts.opts - try { - const persistentOpts = await this._seedIndex.get(encodedKey) - return persistentOpts.opts - } catch (err) { - return null - } + return rsp } - download (drive, path) { - const dl = drive.download(path) - return this._downloads.insert(dl) + async _rpcAllStats (call) { + const networkingOnly = call.request.getNetworkingonly() + var stats = await this.getAllStats({ networkingOnly }) + stats = stats.map(driveStats => toDriveStats(driveStats)) + + const rsp = new rpc.drive.messages.StatsResponse() + rsp.setStatsList(stats) + + return rsp } - getHandlers () { - return { - version: async (call) => { - const id = call.request.getId() + async _rpcAllNetworkConfigurations (call) { + const networkConfigurations = await this.getAllNetworkConfigurations() - if (!id) throw new Error('A version request must specify a session ID.') - const drive = this.driveForSession(id) + const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() + rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ + ...value.opts, + key: Buffer.from(value.key, 'hex') + }))) - const rsp = new rpc.drive.messages.DriveVersionResponse() - rsp.setVersion(drive.version) - - return rsp - }, - - get: async (call) => { - var driveOpts = fromHyperdriveOptions(call.request.getOpts()) - - const { drive, session } = await this.createSession(null, driveOpts.key, driveOpts) - driveOpts.key = drive.key - driveOpts.discoveryKey = drive.discoveryKey - driveOpts.version = drive.version - driveOpts.writable = drive.writable - - const rsp = new rpc.drive.messages.GetDriveResponse() - rsp.setId(session) - rsp.setOpts(toHyperdriveOptions(driveOpts)) - - return rsp - }, - - allStats: async (call) => { - const networkingOnly = call.request.getNetworkingonly() - var stats = await this.getAllStats({ networkingOnly }) - stats = stats.map(driveStats => toDriveStats(driveStats)) - - const rsp = new rpc.drive.messages.StatsResponse() - rsp.setStatsList(stats) - - return rsp - }, - - allNetworkConfigurations: async (call) => { - const networkConfigurations = await this.getAllNetworkConfigurations() - - const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() - rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ - ...value.opts, - key: Buffer.from(value.key, 'hex') - }))) - - return rsp - }, - - peerCounts: async (call) => { - const rsp = new rpc.drive.messages.PeerCountsResponse() - const keys = call.request.getKeysList() - if (!keys) return rsp - - const counts = [] - for (let key of keys) { - key = Buffer.from(key) - if (this.corestore.isLoaded(key)) { - const core = this.corestore.get(key) - const openPeers = core.peers.filter(p => p.remoteOpened) - counts.push(openPeers.length) - } else { - counts.push(0) - } - } + return rsp + } - rsp.setPeercountsList(counts) - return rsp - }, + async _rpcPeerCounts (call) { + const rsp = new rpc.drive.messages.PeerCountsResponse() + const keys = call.request.getKeysList() + if (!keys) return rsp + + const counts = [] + for (let key of keys) { + key = Buffer.from(key) + if (this.corestore.isLoaded(key)) { + const core = this.corestore.get(key) + const openPeers = core.peers.filter(p => p.remoteOpened) + counts.push(openPeers.length) + } else { + counts.push(0) + } + } - configure: async (call) => { - const id = call.request.getId() + rsp.setPeercountsList(counts) + return rsp + } - if (!id) throw new Error('A configuration request must specify a session ID.') - const drive = this.driveForSession(id) - const opts = fromDriveConfiguration(call.request.getConfig()) + async _rpcConfigure (call) { + const id = call.request.getId() - await this.configureDrive(drive, { ...opts }) + if (!id) throw new Error('A configuration request must specify a session ID.') + const drive = this.driveForSession(id) + const opts = fromDriveConfiguration(call.request.getConfig()) - const rsp = new rpc.drive.messages.ConfigureDriveResponse() - return rsp - }, + await this.configureDrive(drive, { ...opts }) - configureNetwork: async (call) => { - const id = call.request.getId() + const rsp = new rpc.drive.messages.ConfigureDriveResponse() + return rsp + } - if (!id) throw new Error('A network configuration request must specify a session ID.') - const drive = this.driveForSession(id) - const opts = fromNetworkConfiguration(call.request.getNetwork()) + async _rpcConfigureNetwork (call) { + const id = call.request.getId() - await this.configureNetwork(drive.metadata, { ...opts }) + if (!id) throw new Error('A network configuration request must specify a session ID.') + const drive = this.driveForSession(id) + const opts = fromNetworkConfiguration(call.request.getNetwork()) - const rsp = new rpc.drive.messages.ConfigureNetworkResponse() - return rsp - }, + await this.configureNetwork(drive.metadata, { ...opts }) - stats: async (call) => { - const id = call.request.getId() + const rsp = new rpc.drive.messages.ConfigureNetworkResponse() + return rsp + } - if (!id) throw new Error('A stats request must specify a session ID.') - const drive = this.driveForSession(id) + async _rpcStats (call) { + const id = call.request.getId() - const recursive = call.request.getRecursive() - const networkingOnly = call.request.getNetworkingonly() - const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) - const networkConfig = await this.getNetworkConfiguration(drive) + if (!id) throw new Error('A stats request must specify a session ID.') + const drive = this.driveForSession(id) - const rsp = new rpc.drive.messages.DriveStatsResponse() - rsp.setStats(toDriveStats(driveStats)) - if (networkConfig) rsp.setNetwork(toNetworkConfiguration(networkConfig)) - return rsp - }, + const recursive = call.request.getRecursive() + const networkingOnly = call.request.getNetworkingonly() + const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) + const networkConfig = await this.getNetworkConfiguration(drive) - download: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + const rsp = new rpc.drive.messages.DriveStatsResponse() + rsp.setStats(toDriveStats(driveStats)) + if (networkConfig) rsp.setNetwork(toNetworkConfiguration(networkConfig)) + return rsp + } - if (!id) throw new Error('A download request must specify a session ID.') - const drive = this.driveForSession(id) - const downloadId = this.download(drive, path) + async _rpcDownload (call) { + const id = call.request.getId() + const path = call.request.getPath() - const rsp = new rpc.drive.messages.DownloadResponse() - rsp.setDownloadid(downloadId) - return rsp - }, + if (!id) throw new Error('A download request must specify a session ID.') + const drive = this.driveForSession(id) + const downloadId = this.download(drive, path) - undownload: async (call) => { - const id = call.request.getId() - const downloadId = call.request.getDownloadid() + const rsp = new rpc.drive.messages.DownloadResponse() + rsp.setDownloadid(downloadId) + return rsp + } - if (!id) throw new Error('An undownload request must specify a session ID.') - if (!downloadId) throw new Error('An undownload request must specify a download ID.') + async _rpcUndownload (call) { + const id = call.request.getId() + const downloadId = call.request.getDownloadid() - const dl = this._downloads.get(downloadId) - if (dl) dl.destroy() - this._downloads.delete(downloadId) + if (!id) throw new Error('An undownload request must specify a session ID.') + if (!downloadId) throw new Error('An undownload request must specify a download ID.') - return new rpc.drive.messages.UndownloadResponse() - }, + const dl = this._downloads.get(downloadId) + if (dl) dl.destroy() + this._downloads.delete(downloadId) - createDiffStream: async (call) => { - const id = call.request.getId() - const prefix = call.request.getPrefix() - const otherVersion = call.request.getOther() + return new rpc.drive.messages.UndownloadResponse() + } - if (!id) throw new Error('A diff stream request must specify a session ID.') - const drive = this.driveForSession(id) + async _rpcCreateDiffStream (call) { + const id = call.request.getId() + const prefix = call.request.getPrefix() + const otherVersion = call.request.getOther() - const stream = drive.createDiffStream(otherVersion, prefix) + if (!id) throw new Error('A diff stream request must specify a session ID.') + const drive = this.driveForSession(id) - const rspMapper = new Transform({ - transform (chunk, cb) { - const rsp = new rpc.drive.messages.DiffStreamResponse() - if (!chunk) return rsp + const stream = drive.createDiffStream(otherVersion, prefix) - const { name, type, value } = chunk - rsp.setType(type) - rsp.setName(name) - if (type === 'put') { - rsp.setValue(toDiffEntry({ stat: value })) - } else { - rsp.setValue(toDiffEntry({ mount: value })) - } + const rspMapper = new Transform({ + transform (chunk, cb) { + const rsp = new rpc.drive.messages.DiffStreamResponse() + if (!chunk) return rsp - return cb(null, rsp) - } - }) + const { name, type, value } = chunk + rsp.setType(type) + rsp.setName(name) + if (type === 'put') { + rsp.setValue(toDiffEntry({ stat: value })) + } else { + rsp.setValue(toDiffEntry({ mount: value })) + } - pump(stream, rspMapper, call, err => { - if (err) { - log.error({ id, err }, 'createDiffStream error') - call.destroy(err) - } - }) - }, + return cb(null, rsp) + } + }) - createReadStream: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const start = call.request.getStart() - var end = call.request.getEnd() - const length = call.request.getLength() + pump(stream, rspMapper, call, err => { + if (err) { + log.error({ id, err }, 'createDiffStream error') + call.destroy(err) + } + }) + } - if (!id) throw new Error('A createReadStream request must specify a session ID.') - if (!path) throw new Error('A createReadStream request must specify a path.') - const drive = this.driveForSession(id) + async _rpcCreateReadStream (call) { + const id = call.request.getId() + const path = call.request.getPath() + const start = call.request.getStart() + var end = call.request.getEnd() + const length = call.request.getLength() + + if (!id) throw new Error('A createReadStream request must specify a session ID.') + if (!path) throw new Error('A createReadStream request must specify a path.') + const drive = this.driveForSession(id) + + const streamOpts = {} + if (end !== 0) streamOpts.end = end + if (length !== 0) streamOpts.length = length + streamOpts.start = start + const stream = drive.createReadStream(path, streamOpts) + + const rspMapper = new Transform({ + transform (chunk, cb) { + const rsp = new rpc.drive.messages.ReadStreamResponse() + rsp.setChunk(chunk) + return cb(null, rsp) + } + }) - const streamOpts = {} - if (end !== 0) streamOpts.end = end - if (length !== 0) streamOpts.length = length - streamOpts.start = start + pump(stream, rspMapper, call, err => { + if (err) { + log.error({ id, err }, 'createReadStream error') + call.destroy(err) + } + }) + } - const stream = drive.createReadStream(path, streamOpts) + async _rpcReadFile (call) { + const id = call.request.getId() + const path = call.request.getPath() - const rspMapper = new Transform({ - transform (chunk, cb) { - const rsp = new rpc.drive.messages.ReadStreamResponse() - rsp.setChunk(chunk) - return cb(null, rsp) - } - }) + if (!id) throw new Error('A readFile request must specify a session ID.') + if (!path) throw new Error('A readFile request must specify a path.') + const drive = this.driveForSession(id) - pump(stream, rspMapper, call, err => { - if (err) { - log.error({ id, err }, 'createReadStream error') - call.destroy(err) - } - }) - }, + const content = await new Promise((resolve, reject) => { + drive.readFile(path, (err, content) => { + if (err) return reject(err) + return resolve(content) + }) + }) - readFile: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + const chunks = toChunks(content) + for (const chunk of chunks) { + const rsp = new rpc.drive.messages.ReadFileResponse() + rsp.setChunk(chunk) + call.write(rsp) + } + call.end() + } + + async _rpcCreateWriteStream (call) { + const unpack = new Transform({ + transform (msg, cb) { + const chunk = msg.getChunk() + return cb(null, Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)) + } + }) + + return new Promise((resolve, reject) => { + call.once('data', req => { + const id = req.getId() + const path = req.getPath() + const opts = fromStat(req.getOpts()) if (!id) throw new Error('A readFile request must specify a session ID.') if (!path) throw new Error('A readFile request must specify a path.') const drive = this.driveForSession(id) - const content = await new Promise((resolve, reject) => { - drive.readFile(path, (err, content) => { - if (err) return reject(err) - return resolve(content) - }) - }) - - const chunks = toChunks(content) - for (const chunk of chunks) { - const rsp = new rpc.drive.messages.ReadFileResponse() - rsp.setChunk(chunk) - call.write(rsp) - } - call.end() - }, - - createWriteStream: async (call) => { - const unpack = new Transform({ - transform (msg, cb) { - const chunk = msg.getChunk() - return cb(null, Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)) - } - }) - - return new Promise((resolve, reject) => { - call.once('data', req => { - const id = req.getId() - const path = req.getPath() - const opts = fromStat(req.getOpts()) + const stream = drive.createWriteStream(path, { mode: opts.mode, uid: opts.uid, gid: opts.gid, metadata: opts.metadata }) - if (!id) throw new Error('A readFile request must specify a session ID.') - if (!path) throw new Error('A readFile request must specify a path.') - const drive = this.driveForSession(id) - - const stream = drive.createWriteStream(path, { mode: opts.mode, uid: opts.uid, gid: opts.gid, metadata: opts.metadata }) + return onstream(resolve, reject, stream) + }) + }) - return onstream(resolve, reject, stream) - }) - }) + function onstream (resolve, reject, stream) { + pump(call, unpack, stream, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.WriteStreamResponse() + return resolve(rsp) + }) + } + } - function onstream (resolve, reject, stream) { - pump(call, unpack, stream, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.WriteStreamResponse() - return resolve(rsp) - }) - } - }, + async _rpcWriteFile (call) { + return new Promise((resolve, reject) => { + call.once('data', req => { + const id = req.getId() + const path = req.getPath() + const opts = fromStat(req.getOpts()) - writeFile: async (call) => { - return new Promise((resolve, reject) => { - call.once('data', req => { - const id = req.getId() - const path = req.getPath() - const opts = fromStat(req.getOpts()) + if (!id) throw new Error('A writeFile request must specify a session ID.') + if (!path) throw new Error('A writeFile request must specify a path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A writeFile request must specify a session ID.') - if (!path) throw new Error('A writeFile request must specify a path.') - const drive = this.driveForSession(id) + return loadContent(resolve, reject, path, drive, opts) + }) + }) - return loadContent(resolve, reject, path, drive, opts) - }) + function loadContent (resolve, reject, path, drive, opts) { + return collectStream(call, (err, reqs) => { + if (err) return reject(err) + const chunks = reqs.map(req => { + const chunk = req.getChunk() + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) }) + return drive.writeFile(path, Buffer.concat(chunks), opts, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.WriteFileResponse() + return resolve(rsp) + }) + }) + } + } - function loadContent (resolve, reject, path, drive, opts) { - return collectStream(call, (err, reqs) => { - if (err) return reject(err) - const chunks = reqs.map(req => { - const chunk = req.getChunk() - return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) - }) - return drive.writeFile(path, Buffer.concat(chunks), opts, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.WriteFileResponse() - return resolve(rsp) - }) - }) - } - }, + async _rpcUpdateMetadata (call) { + const id = call.request.getId() + const path = call.request.getPath() + const metadata = fromMetadata(call.request.getMetadataMap()) - updateMetadata: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const metadata = fromMetadata(call.request.getMetadataMap()) + if (!id) throw new Error('A metadata update request must specify a session ID.') + if (!path) throw new Error('A metadata update request must specify a path.') + if (!metadata) throw new Error('A metadata update request must specify metadata.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A metadata update request must specify a session ID.') - if (!path) throw new Error('A metadata update request must specify a path.') - if (!metadata) throw new Error('A metadata update request must specify metadata.') - const drive = this.driveForSession(id) + return new Promise((resolve, reject) => { + drive._update(path, { metadata }, err => { + if (err) return reject(err) + return resolve(new rpc.drive.messages.UpdateMetadataResponse()) + }) + }) + } - return new Promise((resolve, reject) => { - drive._update(path, { metadata }, err => { - if (err) return reject(err) - return resolve(new rpc.drive.messages.UpdateMetadataResponse()) - }) - }) - }, + async _rpcDeleteMetadata (call) { + const id = call.request.getId() + const path = call.request.getPath() + const keys = call.request.getKeysList() - deleteMetadata: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const keys = call.request.getKeysList() + if (!id) throw new Error('A metadata update request must specify a session ID.') + if (!path) throw new Error('A metadata update request must specify a path.') + if (!keys) throw new Error('A metadata update request must specify metadata keys.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A metadata update request must specify a session ID.') - if (!path) throw new Error('A metadata update request must specify a path.') - if (!keys) throw new Error('A metadata update request must specify metadata keys.') - const drive = this.driveForSession(id) + const metadata = {} + for (const key of keys) { + metadata[key] = null + } - const metadata = {} - for (const key of keys) { - metadata[key] = null - } + return new Promise((resolve, reject) => { + drive._update(path, { metadata }, err => { + if (err) return reject(err) + return resolve(new rpc.drive.messages.DeleteMetadataResponse()) + }) + }) + } - return new Promise((resolve, reject) => { - drive._update(path, { metadata }, err => { - if (err) return reject(err) - return resolve(new rpc.drive.messages.DeleteMetadataResponse()) - }) - }) - }, + async _rpcStat (call) { + const id = call.request.getId() + const path = call.request.getPath() + const lstat = call.request.getLstat() - stat: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const lstat = call.request.getLstat() + if (!id) throw new Error('A stat request must specify a session ID.') + if (!path) throw new Error('A stat request must specify a path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A stat request must specify a session ID.') - if (!path) throw new Error('A stat request must specify a path.') - const drive = this.driveForSession(id) + const method = lstat ? drive.lstat.bind(drive) : drive.stat.bind(drive) - const method = lstat ? drive.lstat.bind(drive) : drive.stat.bind(drive) + return new Promise((resolve, reject) => { + method(path, (err, stat) => { + if (err) return reject(err) - return new Promise((resolve, reject) => { - method(path, (err, stat) => { - if (err) return reject(err) + const rsp = new rpc.drive.messages.StatResponse() + rsp.setStat(toStat(stat)) - const rsp = new rpc.drive.messages.StatResponse() - rsp.setStat(toStat(stat)) + return resolve(rsp) + }) + }) + } - return resolve(rsp) - }) - }) - }, + async _rpcUnlink (call) { + const id = call.request.getId() + const path = call.request.getPath() - unlink: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + if (!id) throw new Error('An unlink request must specify a session ID.') + if (!path) throw new Error('An unlink request must specify a path. ') + const drive = this.driveForSession(id) - if (!id) throw new Error('An unlink request must specify a session ID.') - if (!path) throw new Error('An unlink request must specify a path. ') - const drive = this.driveForSession(id) + return new Promise((resolve, reject) => { + drive.unlink(path, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.UnlinkResponse() + return resolve(rsp) + }) + }) + } - return new Promise((resolve, reject) => { - drive.unlink(path, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.UnlinkResponse() - return resolve(rsp) - }) - }) - }, + async _rpcReaddir (call) { + const id = call.request.getId() + const path = call.request.getPath() + const recursive = call.request.getRecursive() + const noMounts = call.request.getNomounts() + const includeStats = call.request.getIncludestats() - readdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const recursive = call.request.getRecursive() - const noMounts = call.request.getNomounts() - const includeStats = call.request.getIncludestats() + if (!id) throw new Error('A readdir request must specify a session ID.') + if (!path) throw new Error('A readdir request must specify a path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A readdir request must specify a session ID.') - if (!path) throw new Error('A readdir request must specify a path.') - const drive = this.driveForSession(id) + return new Promise((resolve, reject) => { + drive.readdir(path, { recursive, noMounts, includeStats }, (err, files) => { + if (err) return reject(err) - return new Promise((resolve, reject) => { - drive.readdir(path, { recursive, noMounts, includeStats }, (err, files) => { - if (err) return reject(err) - - const rsp = new rpc.drive.messages.ReadDirectoryResponse() - if (!includeStats) { - rsp.setFilesList(files) - } else { - const names = [] - const stats = [] - const mounts = [] - const innerPaths = [] - for (const { name, stat, mount, innerPath } of files) { - names.push(name) - stats.push(toStat(stat)) - mounts.push(toMount(mount)) - innerPaths.push(innerPath) - } - rsp.setFilesList(names) - rsp.setStatsList(stats) - rsp.setMountsList(mounts) - rsp.setInnerpathsList(innerPaths) - } - return resolve(rsp) - }) - }) - }, + const rsp = new rpc.drive.messages.ReadDirectoryResponse() + if (!includeStats) { + rsp.setFilesList(files) + } else { + const names = [] + const stats = [] + const mounts = [] + const innerPaths = [] + for (const { name, stat, mount, innerPath } of files) { + names.push(name) + stats.push(toStat(stat)) + mounts.push(toMount(mount)) + innerPaths.push(innerPath) + } + rsp.setFilesList(names) + rsp.setStatsList(stats) + rsp.setMountsList(mounts) + rsp.setInnerpathsList(innerPaths) + } + return resolve(rsp) + }) + }) + } - mkdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const opts = fromStat(call.request.getOpts()) + async _rpcMkdir (call) { + const id = call.request.getId() + const path = call.request.getPath() + const opts = fromStat(call.request.getOpts()) - if (!id) throw new Error('A mkdir request must specify a session ID.') - if (!path) throw new Error('A mkdir request must specify a directory path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A mkdir request must specify a session ID.') + if (!path) throw new Error('A mkdir request must specify a directory path.') + const drive = this.driveForSession(id) - const mkdirOpts = {} - if (opts.uid) mkdirOpts.uid = opts.uid - if (opts.gid) mkdirOpts.gid = opts.gid - if (opts.mode) mkdirOpts.mode = opts.mode + const mkdirOpts = {} + if (opts.uid) mkdirOpts.uid = opts.uid + if (opts.gid) mkdirOpts.gid = opts.gid + if (opts.mode) mkdirOpts.mode = opts.mode - return new Promise((resolve, reject) => { - drive.mkdir(path, mkdirOpts, err => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.mkdir(path, mkdirOpts, err => { + if (err) return reject(err) - const rsp = new rpc.drive.messages.MkdirResponse() - return resolve(rsp) - }) - }) - }, + const rsp = new rpc.drive.messages.MkdirResponse() + return resolve(rsp) + }) + }) + } - rmdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + async _rpcRmdir (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('A rmdir request must specify a session ID.') - if (!path) throw new Error('A rmdir request must specify a directory path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A rmdir request must specify a session ID.') + if (!path) throw new Error('A rmdir request must specify a directory path.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.rmdir(path, err => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.rmdir(path, err => { + if (err) return reject(err) - const rsp = new rpc.drive.messages.RmdirResponse() - return resolve(rsp) - }) - }) - }, + const rsp = new rpc.drive.messages.RmdirResponse() + return resolve(rsp) + }) + }) + } - mount: async (call) => { - const id = call.request.getId() - const mountInfo = call.request.getInfo() + async _rpcMount (call) { + const id = call.request.getId() + const mountInfo = call.request.getInfo() - const path = mountInfo.getPath() - const opts = fromMount(mountInfo.getOpts()) + const path = mountInfo.getPath() + const opts = fromMount(mountInfo.getOpts()) - if (!id) throw new Error('A mount request must specify a session ID.') - if (!path) throw new Error('A mount request must specify a path.') - if (!opts) throw new Error('A mount request must specify mount options.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A mount request must specify a session ID.') + if (!path) throw new Error('A mount request must specify a path.') + if (!opts) throw new Error('A mount request must specify mount options.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - let error = null - const mountListener = key => { - if (!opts.key || key.equals(opts.key)) { - this.removeListener('configured-mount', mountListener) - if (error) return - const rsp = new rpc.drive.messages.MountDriveResponse() - return resolve(rsp) - } - } - this.on('configured-mount', mountListener) - drive.mount(path, opts.key, opts, err => { - if (err) { - error = err - return reject(err) - } - if (opts.key && this._configuredMounts.has(opts.key.toString('hex'))) { - return mountListener(opts.key) - } - }) - }) - }, + return new Promise((resolve, reject) => { + let error = null + const mountListener = key => { + if (!opts.key || key.equals(opts.key)) { + this.removeListener('configured-mount', mountListener) + if (error) return + const rsp = new rpc.drive.messages.MountDriveResponse() + return resolve(rsp) + } + } + this.on('configured-mount', mountListener) + drive.mount(path, opts.key, opts, err => { + if (err) { + error = err + return reject(err) + } + if (opts.key && this._configuredMounts.has(opts.key.toString('hex'))) { + return mountListener(opts.key) + } + }) + }) + } - unmount: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + async _rpcUnmount (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('An unmount request must specify a session ID.') - if (!path) throw new Error('An unmount request must specify a path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('An unmount request must specify a session ID.') + if (!path) throw new Error('An unmount request must specify a path.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.unmount(path, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.UnmountDriveResponse() - return resolve(rsp) - }) - }) - }, - - watch: async (call) => { - const self = this - var watcher = null - var closed = false - var driveWatchers = null - var keyString = null - - call.once('data', req => { - const id = req.getId() - var path = req.getPath() - - if (!id) throw new Error('A watch request must specify a session ID.') - if (!path) path = '/' - const drive = this.driveForSession(id) - keyString = drive.key.toString('hex') - - driveWatchers = this._watchers.get(keyString) - if (!driveWatchers) { - driveWatchers = [] - this._watchers.set(keyString, driveWatchers) - } + return new Promise((resolve, reject) => { + drive.unmount(path, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.UnmountDriveResponse() + return resolve(rsp) + }) + }) + } - watcher = drive.watch(path, () => { - const rsp = new rpc.drive.messages.WatchResponse() - call.write(rsp) - }) + async _rpcWatch (call) { + const self = this + var watcher = null + var closed = false + var driveWatchers = null + var keyString = null + + call.once('data', req => { + const id = req.getId() + var path = req.getPath() + + if (!id) throw new Error('A watch request must specify a session ID.') + if (!path) path = '/' + const drive = this.driveForSession(id) + keyString = drive.key.toString('hex') + + driveWatchers = this._watchers.get(keyString) + if (!driveWatchers) { + driveWatchers = [] + this._watchers.set(keyString, driveWatchers) + } - const close = onclose.bind(null, id, path, driveWatchers) + watcher = drive.watch(path, () => { + const rsp = new rpc.drive.messages.WatchResponse() + call.write(rsp) + }) - watcher.once('ready', subWatchers => { - // Add one in order to include the root watcher. - this._watchCount += subWatchers.length + 1 - if (this._watchCount > this.watchLimit) { - return close('Watch limit reached. Please close watch connections then try again.') - } - driveWatchers.push(watcher) - - // Any subsequent messages are considered cancellations. - call.on('data', close) - call.on('close', close) - call.on('finish', close) - call.on('error', close) - call.on('end', close) - }) - }) + const close = onclose.bind(null, id, path, driveWatchers) - function onclose (id, path, driveWatchers, err) { - if (closed) return - closed = true - log.debug({ id, path }, 'unregistering watcher') - if (watcher) { - watcher.destroy() - if (watcher.watchers) self._watchCount -= (watcher.watchers.length + 1) - driveWatchers.splice(driveWatchers.indexOf(watcher), 1) - if (!driveWatchers.length) self._watchers.delete(keyString) - } - call.end() + watcher.once('ready', subWatchers => { + // Add one in order to include the root watcher. + this._watchCount += subWatchers.length + 1 + if (this._watchCount > this.watchLimit) { + return close('Watch limit reached. Please close watch connections then try again.') } - }, + driveWatchers.push(watcher) + + // Any subsequent messages are considered cancellations. + call.on('data', close) + call.on('close', close) + call.on('finish', close) + call.on('error', close) + call.on('end', close) + }) + }) + + function onclose (id, path, driveWatchers, err) { + if (closed) return + closed = true + log.debug({ id, path }, 'unregistering watcher') + if (watcher) { + watcher.destroy() + if (watcher.watchers) self._watchCount -= (watcher.watchers.length + 1) + driveWatchers.splice(driveWatchers.indexOf(watcher), 1) + if (!driveWatchers.length) self._watchers.delete(keyString) + } + call.end() + } + } - symlink: async (call) => { - const id = call.request.getId() - const target = call.request.getTarget() - const linkname = call.request.getLinkname() + async _rpcSymlink (call) { + const id = call.request.getId() + const target = call.request.getTarget() + const linkname = call.request.getLinkname() - if (!id) throw new Error('A symlink request must specify a session ID.') - if (!target) throw new Error('A symlink request must specify a target.') - if (!linkname) throw new Error('A symlink request must specify a linkname.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A symlink request must specify a session ID.') + if (!target) throw new Error('A symlink request must specify a target.') + if (!linkname) throw new Error('A symlink request must specify a linkname.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.symlink(target, linkname, err => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.symlink(target, linkname, err => { + if (err) return reject(err) - const rsp = new rpc.drive.messages.SymlinkResponse() - return resolve(rsp) - }) - }) - }, + const rsp = new rpc.drive.messages.SymlinkResponse() + return resolve(rsp) + }) + }) + } - close: async (call) => { - const id = call.request.getId() + async _rpcClose (call) { + const id = call.request.getId() - this.driveForSession(id) - await this.closeSession(id) - const rsp = new rpc.drive.messages.CloseSessionResponse() + this.driveForSession(id) + await this.closeSession(id) + const rsp = new rpc.drive.messages.CloseSessionResponse() - return rsp - }, + return rsp + } - fileStats: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + async _rpcFileStats (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('A fileStats request must specify a session ID.') - if (!path) throw new Error('A fileStats request must specify a path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A fileStats request must specify a session ID.') + if (!path) throw new Error('A fileStats request must specify a path.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.stats(path, (err, stats) => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.stats(path, (err, stats) => { + if (err) return reject(err) - if (!(stats instanceof Map)) { - const fileStats = stats - stats = new Map() - stats.set(path, fileStats) - } - const rsp = new rpc.drive.messages.FileStatsResponse() - setFileStats(rsp.getStatsMap(), stats) + if (!(stats instanceof Map)) { + const fileStats = stats + stats = new Map() + stats.set(path, fileStats) + } + const rsp = new rpc.drive.messages.FileStatsResponse() + setFileStats(rsp.getStatsMap(), stats) - return resolve(rsp) - }) - }) - }, + return resolve(rsp) + }) + }) + } - mounts: async (call) => { - const id = call.request.getId() - const memory = call.request.getMemory() - const recursive = call.request.getRecursive() + async _rpcMounts (call) { + const id = call.request.getId() + const memory = call.request.getMemory() + const recursive = call.request.getRecursive() - if (!id) throw new Error('A mounts request must specify a session ID.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A mounts request must specify a session ID.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.getAllMounts({ memory, recursive }, (err, mounts) => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.DriveMountsResponse() - if (!mounts) return resolve(rsp) - - const mountsList = [] - for (const [path, { metadata }] of mounts) { - mountsList.push(toMountInfo({ - path, - opts: { - key: metadata.key, - version: metadata.version - } - })) + return new Promise((resolve, reject) => { + drive.getAllMounts({ memory, recursive }, (err, mounts) => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.DriveMountsResponse() + if (!mounts) return resolve(rsp) + + const mountsList = [] + for (const [path, { metadata }] of mounts) { + mountsList.push(toMountInfo({ + path, + opts: { + key: metadata.key, + version: metadata.version } - rsp.setMountsList(mountsList) - return resolve(rsp) - }) - }) - } - } + })) + } + rsp.setMountsList(mountsList) + return resolve(rsp) + }) + }) } } diff --git a/lib/peers/index.js b/lib/peers/index.js index b1e064a..40876fb 100644 --- a/lib/peers/index.js +++ b/lib/peers/index.js @@ -4,6 +4,7 @@ const eos = require('end-of-stream') const { rpc } = require('hyperdrive-daemon-client') const messages = rpc.peers.messages const WatchPeersTypes = messages.WatchPeersResponse.Type + const log = require('../log').child({ component: 'peers' }) const ALIAS = Symbol('hyperdrive-peer-alias') @@ -99,13 +100,4 @@ module.exports = class PeersManager extends EventEmitter { this._keysByAlias.set(alias, remoteKey) return alias } - - getHandlers () { - return { - listPeers: this._rpcListPeers.bind(this), - watchPeers: this._rpcWatchPeers.bind(this), - getAlias: this._rpcGetAlias.bind(this), - getKey: this._rpcGetKey.bind(this) - } - } } diff --git a/lib/peersockets/index.js b/lib/peersockets/index.js index 5e5ac73..1c36c1a 100644 --- a/lib/peersockets/index.js +++ b/lib/peersockets/index.js @@ -24,12 +24,6 @@ module.exports = class PeersocketsManager extends EventEmitter { const topicHandler = new TopicHandler(this, this.peersockets, this.peers, call) this.handles.push(topicHandler) } - - getHandlers () { - return { - join: this._rpcJoin.bind(this) - } - } } class TopicHandler { diff --git a/package.json b/package.json index 486b931..96e49d3 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "hyperdrive": "./bin/run/run" }, "scripts": { - "test": "standard && NODE_ENV=test tape test/*.js" + "test": "NODE_ENV=test tape test/*.js" }, "files": [ "index.js", From 87350a097a84a60bcf9755c07821190310508709 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Fri, 5 Jun 2020 15:53:48 +0200 Subject: [PATCH 12/28] Progress on API refactoring + mirroring --- index.js | 17 ++- lib/{cores/index.js => cores.js} | 0 lib/drives/index.js | 125 +++++-------------- lib/migrations/index.js | 0 lib/network.js | 93 ++++++++++++++ lib/{peers/index.js => peers.js} | 0 lib/{peersockets/index.js => peersockets.js} | 0 package.json | 2 + 8 files changed, 140 insertions(+), 97 deletions(-) rename lib/{cores/index.js => cores.js} (100%) create mode 100644 lib/migrations/index.js create mode 100644 lib/network.js rename lib/{peers/index.js => peers.js} (100%) rename lib/{peersockets/index.js => peersockets.js} (100%) diff --git a/index.js b/index.js index 77e16a0..d1b869e 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,8 @@ const { rpc, apiVersion } = require('hyperdrive-daemon-client') const { createMetadata } = require('./lib/metadata') const constants = require('hyperdrive-daemon-client/lib/constants') +const NetworkManager = require('./lib/network') +const CoreManager = require('./lib/cores') const DriveManager = require('./lib/drives') const PeersocketManager = require('./lib/peersockets') const PeersManager = require('./lib/peers') @@ -139,8 +141,9 @@ class HyperdriveDaemon extends EventEmitter { this.db = this._dbProvider(`${this.storage}/db`, { valueEncoding: 'json' }) const dbs = { fuse: sub(this.db, 'fuse', { valueEncoding: bjson }), + cores: sub(this.db, 'cores', { valueEncoding: bjson }), drives: sub(this.db, 'drives', { valueEncoding: bjson }), - profiles: sub(this.db, 'profiles', { valueEncoding: 'json' }) + network: sub(this.db, 'network', { valueEncoding: 'json'}) } this._dbs = dbs @@ -174,7 +177,10 @@ class HyperdriveDaemon extends EventEmitter { this.peersockets = new PeersocketManager(this.networking, this.peers, peersockets) if (!this.noDebug) this.debug = new DebugManager(this) - this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, { + this.network = new NetworkManager(this.networking, dbs.network) + await this.network.ready() + + this.drives = new DriveManager(this.corestore, this.network, dbs.drives, { ...this.opts, memoryOnly: this.memoryOnly, watchLimit: this.opts.watchLimit || WATCH_LIMIT @@ -182,6 +188,13 @@ class HyperdriveDaemon extends EventEmitter { this.drives.on('error', err => this.emit('error', err)) await this.drives.ready() + this.cores = new CoreManager(this.corestore, this.network, dbs.cores, { + ...this.opts, + memoryOnly: this.memoryOnly + }) + this.cores.on('error', err => this.emit('error', err)) + await this.cores.ready() + this.fuse = new FuseManager(this.drives, this._dbs.fuse, this.opts) this.fuse.on('error', err => this.emit('error', err)) await this.fuse.ready() diff --git a/lib/cores/index.js b/lib/cores.js similarity index 100% rename from lib/cores/index.js rename to lib/cores.js diff --git a/lib/drives/index.js b/lib/drives/index.js index 55bc42f..66e7c09 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -1,12 +1,7 @@ -const { EventEmitter } = require('events') - const hyperdrive = require('hyperdrive') -const hypercoreCrypto = require('hypercore-crypto') -const collectStream = require('stream-collector') -const sub = require('subleveldown') -const bjson = require('buffer-json-encoding') const datEncoding = require('dat-encoding') const pump = require('pump') +const Nanoresource = require('nanoresource-promise/emitter') const { Transform } = require('streamx') const { @@ -33,59 +28,46 @@ const log = require('../log').child({ component: 'drive-manager' }) const TRIE_UPDATER_SYMBOL = Symbol('hyperdrive-daemon-trie-updater') -class DriveManager extends EventEmitter { - constructor (cores, networking, db, opts = {}) { +class DriveManager extends Nanoresource { + constructor (corestore, networking, db, opts = {}) { super() - this.cores = cores + this.corestore = corestore this.networking = networking this.db = db this.opts = opts this.watchLimit = opts.watchLimit - this.noAnnounce = !!opts.noAnnounce this.memoryOnly = !!opts.memoryOnly this._driveIndex = sub(this.db, 'drives', { valueEncoding: bjson }) - this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) this._drives = new Map() this._checkouts = new Map() this._watchers = new Map() this._sessionsByKey = new Map() - this._transientSeedIndex = new Map() this._configuredMounts = new Set() this._sessions = new ArrayIndex() this._downloads = new ArrayIndex() - this._mirrorRanges = new Map() - this._mirrorSessions = new Map() + this._mirrors = new Map() this._watchCount = 0 + } - this._readyPromise = null - this.ready = () => { - if (this._readyPromise) return this._readyPromise - this._readyPromise = Promise.all([ - this._remirror() - ]) - return this._readyPromise - } + ready () { + return this.open() } - async _rejoin () { - if (this.noAnnounce) return - const driveList = await collect(this._seedIndex) - for (const { key: discoveryKey, value: networkOpts } of driveList) { - const opts = networkOpts && networkOpts.opts - if (!opts || !opts.announce) continue - this.networking.join(discoveryKey, { ...networkOpts.opts }) - } + async _open () { + return Promise.all([ + this._remirror() + ]) } async _remirror () { const mirrorList = await collect(this._mirrorIndex) for (const { key } of mirrorList) { const drive = await this.get({ key }) - await this.configureDrive(drive, { sparse: false }) + await this._startMirroring(drive) } } @@ -97,57 +79,24 @@ class DriveManager extends EventEmitter { } async _startMirroring (drive) { - const driveKey = drive.key.toString('hex') - const mounts = await new Promise((resolve, reject) => { - drive.getAllMounts({ memory: false, recursive: true }, (err, mounts) => { - if (err) return reject(err) - return resolve(mounts) - }) - }) - // A mirrored drive should never be closed. const { session: mirrorSession } = await this.createSession(drive) - this._mirrorSessions.set(driveKey, mirrorSession) - - for (const [, { metadata, content }] of mounts) { - const metadataKey = metadata.key.toString('hex') - const contentKey = content.key.toString('hex') - if (!this._mirrorRanges.has(metadataKey)) { - const range = metadata.download() - this._mirrorRanges.set(metadataKey, { core: metadata, range }) - } - if (!this._mirrorRanges.has(contentKey)) { - const range = content.download() - this._mirrorRanges.set(contentKey, { core: content, range }) - } - } + const unmirror = drive.mirror() + this._mirrors.set(drive.key.toString('hex'), { + session: mirrorSession, + unmirror + }) + await this._mirrorIndex.put() + log.info({ discoveryKey: drive.discoveryKey.toString('hex') }, 'mirroring drive') } async _stopMirroring (drive) { const driveKey = drive.key.toString('hex') - const mounts = await new Promise((resolve, reject) => { - drive.getAllMounts({ memory: false, recursive: true }, (err, mounts) => { - if (err) return reject(err) - return resolve(mounts) - }) - }) - for (const [, { metadata, content }] of mounts) { - const metadataKey = metadata.key.toString('hex') - const contentKey = content.key.toString('hex') - if (this._mirrorRanges.has(metadataKey)) { - const { core, range } = this._mirrorRanges.get(metadataKey) - core.undownload(range) - this._mirrorRanges.delete(metadataKey) - } - if (this._mirrorRanges.has(contentKey)) { - const { core, range } = this._mirrorRanges.get(contentKey) - core.undownload(range) - this._mirrorRanges.delete(contentKey) - } - } - if (this._mirrorSessions.has(driveKey)) { - await this.closeSession(this._mirrorSessions.get(driveKey)) - } + const mirrorInfo = this._mirrors.get(driveKey) + if (!mirrorInfo) return + this._mirrors.delete(driveKey) + mirrorInfo.unmirror() + this.closeSession(mirrorInfo.session) } driveForSession (sessionId) { @@ -280,12 +229,6 @@ class DriveManager extends EventEmitter { return collect(this._driveIndex) } - async getAllNetworkConfigurations () { - const storedConfigurations = (await collect(this._seedIndex)).map(({ key, value }) => [key, value]) - const transientConfigurations = [...this._transientSeedIndex] - return new Map([...storedConfigurations, ...transientConfigurations]) - } - async get (key, opts = {}) { key = (key instanceof Buffer) ? datEncoding.decode(key) : key var keyString = this._generateKeyString(key, opts) @@ -310,7 +253,7 @@ class DriveManager extends EventEmitter { if (!drive) { const randomNamespace = hypercoreCrypto.randomBytes(32).toString('hex') - drive = hyperdrive(this.cores.corestore, key, { + drive = hyperdrive(this.corestore, key, { namespace: randomNamespace, ...driveOpts }) @@ -358,10 +301,10 @@ class DriveManager extends EventEmitter { if (opts.fuseNetwork) { // TODO: The Network drive does not announce or remember any settings for now. initialConfig = { lookup: true, announce: false, remember: false } - await this.configureNetwork(drive.metadata, initialConfig) + await this.networking.configure(drive.metadata, initialConfig) } else if (!drive.writable || opts.seed) { initialConfig = { lookup: true, announce: false, remember: true } - await this.configureNetwork(drive.metadata, initialConfig) + await this.networking.configure(drive.metadata, initialConfig) } // Make sure that any inner mounts are recorded in the drive index. @@ -379,7 +322,7 @@ class DriveManager extends EventEmitter { remember: true } - if (mountConfig) await this.configureNetwork(feed, mountConfig) + if (mountConfig) await this.cores.configureNetwork(feed, mountConfig) this.emit('configured-mount', feed.key) this._configuredMounts.add(mountKey) try { @@ -421,14 +364,6 @@ class DriveManager extends EventEmitter { return checkout || drive } - async configureDrive (drive, opts = {}) { - if (opts.sparse === false) { - await this._startMirroring(drive) - } else { - await this._stopMirroring(drive) - } - } - download (drive, path) { const dl = drive.download(path) return this._downloads.insert(dl) @@ -475,7 +410,7 @@ class DriveManager extends EventEmitter { } async _rpcAllNetworkConfigurations (call) { - const networkConfigurations = await this.getAllNetworkConfigurations() + const networkConfigurations = await this.network.getAllNetworkConfigurations() const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ diff --git a/lib/migrations/index.js b/lib/migrations/index.js new file mode 100644 index 0000000..e69de29 diff --git a/lib/network.js b/lib/network.js new file mode 100644 index 0000000..55154ef --- /dev/null +++ b/lib/network.js @@ -0,0 +1,93 @@ +const sub = require('subleveldown') +const bjson = require('buffer-json-encoding') +const datEncoding = require('dat-encoding') +const Nanoresource = require('nanoresource-promise/emitter') + +const log = require('../log').child({ component: 'network-manager' }) + +class NetworkManager extends Nanoresource { + constructor (networking, db, opts = {}) { + super() + this.networking = networking + this.db = db + this.noAnnounce = !!opts.noAnnounce + + this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) + } + + ready () { + return this.open() + } + + async _open () { + return Promise.all([ + this._rejoin() + ]) + } + + async _rejoin () { + if (this.noAnnounce) return + const seedList = await collect(this._seedIndex) + for (const { key: discoveryKey, value: networkOpts } of seedList) { + const opts = networkOpts && networkOpts.opts + if (!opts || !opts.announce) continue + this.networking.join(discoveryKey, { ...networkOpts.opts, loadForLength: true }) + } + } + + async configure (discoveryKey, opts = {}) { + const self = this + const encodedKey = datEncoding.encode(discoveryKey) + const networkOpts = { + lookup: !!opts.lookup, + announce: !!opts.announce, + } + const seeding = opts.lookup || opts.announce + var networkingPromise + + const sameConfig = sameNetworkConfig(discoveryKey, opts) + // If all the networking options are the same, exit early. + if (sameConfig) return + + const networkConfig = { opts: networkOpts } + if (seeding) await this._seedIndex.put(encodedKey, networkConfig) + else await this._seedIndex.del(encodedKey) + + // Failsafe + if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + + try { + if (seeding) { + networkingPromise = this.networking.join(discoveryKey, networkOpts) + } else { + networkingPromise = this.networking.leave(discoveryKey) + } + networkingPromise.then(configurationSuccess) + networkingPromise.catch(configurationError) + } catch (err) { + configurationError(err) + } + + function sameNetworkConfig (discoveryKey, opts = {}) { + const swarmStatus = self.networking.status(discoveryKey) + if (!swarmStatus) return opts.lookup === false && opts.announce === false + return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup + } + + function configurationError (err) { + log.error({ err, discoveryKey: encodedKey }, 'network configuration error') + } + + function configurationSuccess () { + log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') + } + } + + async getAllNetworkConfigurations () { + const storedConfigurations = (await collect(this._seedIndex)).map(({ key, value }) => [key, value]) + return new Map(storedConfigurations) + } +} + +module.exports = NetworkManager + diff --git a/lib/peers/index.js b/lib/peers.js similarity index 100% rename from lib/peers/index.js rename to lib/peers.js diff --git a/lib/peersockets/index.js b/lib/peersockets.js similarity index 100% rename from lib/peersockets/index.js rename to lib/peersockets.js diff --git a/package.json b/package.json index 96e49d3..cb6975d 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "@oclif/plugin-autocomplete": "^0.1.5", "@oclif/plugin-help": "^2.2.3", "buffer-json-encoding": "^1.0.2", + "call-me-maybe": "^1.0.1", "corestore": "^5.0.0", "corestore-swarm-networking": "^5.0.0", "dat-encoding": "^5.0.1", @@ -57,6 +58,7 @@ "level-mem": "^5.0.1", "minimist": "^1.2.5", "mkdirp": "^0.5.1", + "nanoresource-promise": "^1.2.2", "ora": "^4.0.3", "peersockets": "^0.3.0", "pino": "^5.12.6", From ca83fadfcec4e1859776a8ba5a290dc6168402f6 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Sat, 6 Jun 2020 18:34:02 +0200 Subject: [PATCH 13/28] Adding migrations --- index.js | 7 +++ lib/common.js | 24 +++++++++- lib/drives/index.js | 98 ++++++++++++++++++++--------------------- lib/migrations/index.js | 24 ++++++++++ lib/network.js | 34 +++++++++++++- package.json | 3 +- 6 files changed, 135 insertions(+), 55 deletions(-) diff --git a/index.js b/index.js index d1b869e..5106152 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ const sub = require('subleveldown') const grpc = require('@grpc/grpc-js') const bjson = require('buffer-json-encoding') const processTop = require('process-top') +const varint = require('varint') const Corestore = require('corestore') const HypercoreCache = require('hypercore-cache') const SwarmNetworker = require('corestore-swarm-networking') @@ -16,6 +17,7 @@ const { rpc, apiVersion } = require('hyperdrive-daemon-client') const { createMetadata } = require('./lib/metadata') const constants = require('hyperdrive-daemon-client/lib/constants') +const Migrations = require('./lib/migrations') const NetworkManager = require('./lib/network') const CoreManager = require('./lib/cores') const DriveManager = require('./lib/drives') @@ -96,6 +98,7 @@ class HyperdriveDaemon extends EventEmitter { if (opts.latency !== undefined) this._networkOpts.latency = +opts.latency // Set in ready. + this.migrations = null this.networking = null this.db = null this.drives = null @@ -140,6 +143,7 @@ class HyperdriveDaemon extends EventEmitter { this.db = this._dbProvider(`${this.storage}/db`, { valueEncoding: 'json' }) const dbs = { + migrations: sub(this.db, 'migrations', { valueEncoding: varint }), fuse: sub(this.db, 'fuse', { valueEncoding: bjson }), cores: sub(this.db, 'cores', { valueEncoding: bjson }), drives: sub(this.db, 'drives', { valueEncoding: bjson }), @@ -149,6 +153,9 @@ class HyperdriveDaemon extends EventEmitter { await this.corestore.ready() + this.migrations = new Migrations(dbs) + await this.migrations.ensureMigrated() + const seed = this.corestore._deriveSecret(NAMESPACE, 'replication-keypair') const swarmId = this.corestore._deriveSecret(NAMESPACE, 'swarm-id') this._networkOpts.keyPair = HypercoreProtocol.keyPair(seed) diff --git a/lib/common.js b/lib/common.js index 18bae12..a33705c 100644 --- a/lib/common.js +++ b/lib/common.js @@ -1,3 +1,5 @@ +const collectStream = require('stream-collector') + function getHandlers (manager) { const handlers = {} const rpcMethods = Object.getOwnPropertyNames(manager.__proto__).filter(methodName => methodName.startsWith('_rpc')) @@ -9,6 +11,26 @@ function getHandlers (manager) { return handlers } +function dbCollect (index, opts) { + return new Promise((resolve, reject) => { + collectStream(index.createReadStream(opts), (err, list) => { + if (err) return reject(err) + return resolve(list) + }) + }) +} + +async function dbGet (db, idx) { + try { + return await db.get(idx) + } catch (err) { + if (err && !err.notFound) throw err + return null + } +} + module.exports = { - getHandlers + getHandlers, + dbCollect, + dbGet } diff --git a/lib/drives/index.js b/lib/drives/index.js index 66e7c09..d0b419e 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -1,6 +1,9 @@ const hyperdrive = require('hyperdrive') const datEncoding = require('dat-encoding') const pump = require('pump') +const sub = require('subleveldown') +const bjson = require('buffer-json-encoding') +const collectStream = require('stream-collector') const Nanoresource = require('nanoresource-promise/emitter') const { Transform } = require('streamx') @@ -17,30 +20,34 @@ const { toMountInfo, toDriveStats, toDiffEntry, - toNetworkConfiguration, setFileStats, toChunks } = require('hyperdrive-daemon-client/lib/common') const { rpc } = require('hyperdrive-daemon-client') -const ArrayIndex = require('./array-index.js') +const ArrayIndex = require('./array-index') +const { dbCollect, dbGet } = require('../common') const log = require('../log').child({ component: 'drive-manager' }) const TRIE_UPDATER_SYMBOL = Symbol('hyperdrive-daemon-trie-updater') class DriveManager extends Nanoresource { - constructor (corestore, networking, db, opts = {}) { + constructor (corestore, network, db, opts = {}) { super() this.corestore = corestore - this.networking = networking + this.network = network this.db = db this.opts = opts this.watchLimit = opts.watchLimit this.memoryOnly = !!opts.memoryOnly - this._driveIndex = sub(this.db, 'drives', { valueEncoding: bjson }) - this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) + const dbs = DriveManager.SUB_DBS.reduce((acc, dbInfo) => { + acc[dbInfo.prefix] = sub(this.db, dbInfo.prefix, { valueEncoding: dbInfo.valueEncoding }) + }) + + this._driveIndex = dbs.drives + this._mirrorIndex = dbs.mirroring this._drives = new Map() this._checkouts = new Map() @@ -64,9 +71,9 @@ class DriveManager extends Nanoresource { } async _remirror () { - const mirrorList = await collect(this._mirrorIndex) + const mirrorList = await dbCollect(this._mirrorIndex) for (const { key } of mirrorList) { - const drive = await this.get({ key }) + const drive = await this.get(key) await this._startMirroring(drive) } } @@ -82,21 +89,24 @@ class DriveManager extends Nanoresource { // A mirrored drive should never be closed. const { session: mirrorSession } = await this.createSession(drive) const unmirror = drive.mirror() - this._mirrors.set(drive.key.toString('hex'), { + const driveKey = drive.key.toString('hex') + this._mirrors.set(driveKey, { session: mirrorSession, unmirror }) - await this._mirrorIndex.put() + // Only the key is relevant, but gets for valid keys shouldn't return null. + await this._mirrorIndex.put(driveKey, 'mirroring') log.info({ discoveryKey: drive.discoveryKey.toString('hex') }, 'mirroring drive') } async _stopMirroring (drive) { const driveKey = drive.key.toString('hex') const mirrorInfo = this._mirrors.get(driveKey) - if (!mirrorInfo) return + if (!mirrorInfo) return null this._mirrors.delete(driveKey) mirrorInfo.unmirror() this.closeSession(mirrorInfo.session) + return this._mirrorIndex.del(driveKey) } driveForSession (sessionId) { @@ -226,7 +236,7 @@ class DriveManager extends Nanoresource { } listDrives () { - return collect(this._driveIndex) + return dbCollect(this._driveIndex) } async get (key, opts = {}) { @@ -301,10 +311,10 @@ class DriveManager extends Nanoresource { if (opts.fuseNetwork) { // TODO: The Network drive does not announce or remember any settings for now. initialConfig = { lookup: true, announce: false, remember: false } - await this.networking.configure(drive.metadata, initialConfig) + await this.network.configure(drive.metadata, initialConfig) } else if (!drive.writable || opts.seed) { initialConfig = { lookup: true, announce: false, remember: true } - await this.networking.configure(drive.metadata, initialConfig) + await this.network.configure(drive.metadata, initialConfig) } // Make sure that any inner mounts are recorded in the drive index. @@ -314,27 +324,20 @@ class DriveManager extends Nanoresource { const mountKey = feed.key.toString('hex') log.info({ discoveryKey: feed.discoveryKey.toString('hex') }, 'registering mountpoint in drive index') - const parentConfig = (await this.getNetworkConfiguration(drive)) || initialConfig || {} - const existingMountConfig = (await this.getNetworkConfiguration(feed)) || {} + const parentConfig = (await this.network.getConfiguration(drive.discoveryKey)) || initialConfig || {} + const existingMountConfig = (await this.network.getConfiguration(feed.discoveryKey)) || {} const mountConfig = { lookup: (existingMountConfig.lookup !== false) && (parentConfig.lookup !== false), announce: !!(existingMountConfig.announce || parentConfig.announce), remember: true } - if (mountConfig) await this.cores.configureNetwork(feed, mountConfig) + if (mountConfig) await this.network.configure(feed.discoveryKey, mountConfig) this.emit('configured-mount', feed.key) this._configuredMounts.add(mountKey) - try { - await this._driveIndex.get(mountKey) - } catch (err) { - if (err && !err.notFound) log.error({ error: err }, 'error registering mountpoint in drive index') - try { - await this._driveIndex.put(mountKey, mountInfo) - } catch (err) { - log.error({ error: err }, 'could not register mountpoint in drive index') - } - } + + const existingConfig = await dbGet(this._driveIndex, mountKey) + if (!existingConfig) await this._driveIndex.put(mountKey, mountInfo) } drive.on('mount', mountListener) unlisteners.push(() => drive.removeAllListeners('mount')) @@ -410,7 +413,8 @@ class DriveManager extends Nanoresource { } async _rpcAllNetworkConfigurations (call) { - const networkConfigurations = await this.network.getAllNetworkConfigurations() + return this.network._rpcAllConfigurations(call) + const networkConfigurations = await this.network.getAllConfigurations() const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ @@ -442,19 +446,6 @@ class DriveManager extends Nanoresource { return rsp } - async _rpcConfigure (call) { - const id = call.request.getId() - - if (!id) throw new Error('A configuration request must specify a session ID.') - const drive = this.driveForSession(id) - const opts = fromDriveConfiguration(call.request.getConfig()) - - await this.configureDrive(drive, { ...opts }) - - const rsp = new rpc.drive.messages.ConfigureDriveResponse() - return rsp - } - async _rpcConfigureNetwork (call) { const id = call.request.getId() @@ -462,7 +453,7 @@ class DriveManager extends Nanoresource { const drive = this.driveForSession(id) const opts = fromNetworkConfiguration(call.request.getNetwork()) - await this.configureNetwork(drive.metadata, { ...opts }) + await this.network.configure(drive.metadata.discoveryKey, { ...opts }) const rsp = new rpc.drive.messages.ConfigureNetworkResponse() return rsp @@ -477,7 +468,7 @@ class DriveManager extends Nanoresource { const recursive = call.request.getRecursive() const networkingOnly = call.request.getNetworkingonly() const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) - const networkConfig = await this.getNetworkConfiguration(drive) + const networkConfig = await this.networking.getConfiguration(drive.discoveryKey) const rsp = new rpc.drive.messages.DriveStatsResponse() rsp.setStats(toDriveStats(driveStats)) @@ -1028,14 +1019,19 @@ class DriveManager extends Nanoresource { }) } } - -function collect (index, opts) { - return new Promise((resolve, reject) => { - collectStream(index.createReadStream(opts), (err, list) => { - if (err) return reject(err) - return resolve(list) - }) - }) +DriveManager.SUB_DBS = { + drives: { + prefix: 'drives', + valueEncoding: 'bjson' + }, + mirroring: { + prefix: 'mirrors', + valueEncoding: 'utf8' + }, + seeding: { + prefix: 'seeding', + valueEncoding: 'json' + } } module.exports = DriveManager diff --git a/lib/migrations/index.js b/lib/migrations/index.js index e69de29..8c8fc26 100644 --- a/lib/migrations/index.js +++ b/lib/migrations/index.js @@ -0,0 +1,24 @@ +const { dbGet } = require('../common') + +const LAST_MIGRATION_KEY = 'last_migration' +const migrations = [ + require('./1-move-seeding-db.js') +] + +class Migrations { + constructor (dbs) { + this.dbs = dbs + } + + async ensureMigrated () { + var lastMigration = await dbGet(this.dbs.migrations, LAST_MIGRATION_KEY) + for (let i = 0; i < migrations.length; i++) { + if (lastMigration && (i < lastMigration)) continue + await migrations[i](this.dbs) + lastMigration = i + 1 + await this.dbs.migrations.put(LAST_MIGRATION_KEY, lastMigration) + } + } +} + +module.export = Migrations diff --git a/lib/network.js b/lib/network.js index 55154ef..9baf937 100644 --- a/lib/network.js +++ b/lib/network.js @@ -3,6 +3,13 @@ const bjson = require('buffer-json-encoding') const datEncoding = require('dat-encoding') const Nanoresource = require('nanoresource-promise/emitter') +const { + fromNetworkConfiguration, + toNetworkConfiguration +} = require('hyperdrive-daemon-client/lib/common') +const { rpc } = require('hyperdrive-daemon-client') + +const { dbGet, dbCollect } = require('./common') const log = require('../log').child({ component: 'network-manager' }) class NetworkManager extends Nanoresource { @@ -83,10 +90,33 @@ class NetworkManager extends Nanoresource { } } - async getAllNetworkConfigurations () { - const storedConfigurations = (await collect(this._seedIndex)).map(({ key, value }) => [key, value]) + async getConfiguration (discoveryKey) { + const networkOpts = await dbGet(this._seedIndex, datEncoding.encode(discoveryKey)) + return networkOpts ? networkOpts.opts : null + } + + async getAllConfigurations () { + const storedConfigurations = (await dbCollect(this._seedIndex)).map(({ key, value }) => [key, value]) return new Map(storedConfigurations) } + + async _rpcAllNetworkConfigurations (call) { + const networkConfigurations = await this.getAllConfigurations() + + const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() + rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ + ...value.opts, + key: Buffer.from(value.key, 'hex') + }))) + + return rsp + } +} +NetworkManager.SUB_DBS = { + seeding: { + prefix: 'seeding', + valueEncoding: 'json' + } } module.exports = NetworkManager diff --git a/package.json b/package.json index cb6975d..bb37afd 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,8 @@ "random-access-memory": "^3.1.1", "stream-collector": "^1.0.1", "streamx": "^2.6.0", - "subleveldown": "^4.0.0" + "subleveldown": "^4.0.0", + "varint": "^5.0.0" }, "optionalDependencies": { "fuse-native": "^2.2.1", From 36cbc73d82a9e107b3abbb5af21a023b69824924 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 15:36:08 +0200 Subject: [PATCH 14/28] More refactoring progress --- index.js | 9 ---- lib/cores.js | 96 ----------------------------------------- lib/drives/index.js | 27 +++++------- lib/migrations/index.js | 2 +- lib/network.js | 16 +++---- lib/peers.js | 2 +- lib/peersockets.js | 2 +- 7 files changed, 21 insertions(+), 133 deletions(-) delete mode 100644 lib/cores.js diff --git a/index.js b/index.js index 5106152..fa0e8ff 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,6 @@ const constants = require('hyperdrive-daemon-client/lib/constants') const Migrations = require('./lib/migrations') const NetworkManager = require('./lib/network') -const CoreManager = require('./lib/cores') const DriveManager = require('./lib/drives') const PeersocketManager = require('./lib/peersockets') const PeersManager = require('./lib/peers') @@ -145,7 +144,6 @@ class HyperdriveDaemon extends EventEmitter { const dbs = { migrations: sub(this.db, 'migrations', { valueEncoding: varint }), fuse: sub(this.db, 'fuse', { valueEncoding: bjson }), - cores: sub(this.db, 'cores', { valueEncoding: bjson }), drives: sub(this.db, 'drives', { valueEncoding: bjson }), network: sub(this.db, 'network', { valueEncoding: 'json'}) } @@ -195,13 +193,6 @@ class HyperdriveDaemon extends EventEmitter { this.drives.on('error', err => this.emit('error', err)) await this.drives.ready() - this.cores = new CoreManager(this.corestore, this.network, dbs.cores, { - ...this.opts, - memoryOnly: this.memoryOnly - }) - this.cores.on('error', err => this.emit('error', err)) - await this.cores.ready() - this.fuse = new FuseManager(this.drives, this._dbs.fuse, this.opts) this.fuse.on('error', err => this.emit('error', err)) await this.fuse.ready() diff --git a/lib/cores.js b/lib/cores.js deleted file mode 100644 index 6161a1c..0000000 --- a/lib/cores.js +++ /dev/null @@ -1,96 +0,0 @@ -const { EventEmitter } = require('events') -const sub = require('subleveldown') -const bjson = require('buffer-json-encoding') -const datEncoding = require('dat-encoding') - -const log = require('../log').child({ component: 'core-manager' }) - -class CoreManager extends EventEmitter { - constructor (corestore, networking, db, opts = {}) { - super() - - this.corestore = corestore - this.networking = networking - this.db = db - - this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) - this._mirrorIndex = sub(this.db, 'mirrors', { valueEncoding: 'utf8' }) - } - - async _rejoin () { - if (this.noAnnounce) return - const seedList = await collect(this._seedIndex) - for (const { key: discoveryKey, value: networkOpts } of seedList) { - const opts = networkOpts && networkOpts.opts - if (!opts || !opts.announce) continue - this.networking.join(discoveryKey, { ...networkOpts.opts, loadForLength: true }) - } - } - - async configureNetwork (feed, opts = {}) { - const self = this - const encodedKey = datEncoding.encode(feed.discoveryKey) - const networkOpts = { - lookup: !!opts.lookup, - announce: !!opts.announce, - remember: !!opts.remember - } - const seeding = opts.lookup || opts.announce - var networkingPromise - - const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) - // If all the networking options are the same, exit early. - if (sameConfig) return - - const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } - if (opts.remember) { - if (seeding) await this._seedIndex.put(encodedKey, networkConfig) - else await this._seedIndex.del(encodedKey) - } else { - this._transientSeedIndex.set(encodedKey, networkConfig) - } - - // Failsafe - if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false - - try { - if (seeding) { - networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) - } else { - networkingPromise = this.networking.leave(feed.discoveryKey) - } - networkingPromise.then(configurationSuccess) - networkingPromise.catch(configurationError) - } catch (err) { - configurationError(err) - } - - function sameNetworkConfig (discoveryKey, opts = {}) { - const swarmStatus = self.networking.status(discoveryKey) - if (!swarmStatus) return opts.lookup === false && opts.announce === false - return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup - } - - function configurationError (err) { - log.error({ err, discoveryKey: encodedKey }, 'network configuration error') - } - - function configurationSuccess () { - log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') - } - } - - async getNetworkConfiguration (drive) { - const encodedKey = datEncoding.encode(drive.discoveryKey) - const networkOpts = this._transientSeedIndex.get(encodedKey) - if (networkOpts) return networkOpts.opts - try { - const persistentOpts = await this._seedIndex.get(encodedKey) - return persistentOpts.opts - } catch (err) { - return null - } - } -} - -module.exports = CoreManager diff --git a/lib/drives/index.js b/lib/drives/index.js index d0b419e..c1828c2 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -4,9 +4,10 @@ const pump = require('pump') const sub = require('subleveldown') const bjson = require('buffer-json-encoding') const collectStream = require('stream-collector') -const Nanoresource = require('nanoresource-promise/emitter') +const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter') const { Transform } = require('streamx') + const { fromHyperdriveOptions, fromStat, @@ -42,12 +43,10 @@ class DriveManager extends Nanoresource { this.watchLimit = opts.watchLimit this.memoryOnly = !!opts.memoryOnly - const dbs = DriveManager.SUB_DBS.reduce((acc, dbInfo) => { - acc[dbInfo.prefix] = sub(this.db, dbInfo.prefix, { valueEncoding: dbInfo.valueEncoding }) - }) + const dbs = DriveManager.generateSubDbs(db) this._driveIndex = dbs.drives - this._mirrorIndex = dbs.mirroring + this._mirrorIndex = dbs.mirrors this._drives = new Map() this._checkouts = new Map() @@ -1019,18 +1018,12 @@ class DriveManager extends Nanoresource { }) } } -DriveManager.SUB_DBS = { - drives: { - prefix: 'drives', - valueEncoding: 'bjson' - }, - mirroring: { - prefix: 'mirrors', - valueEncoding: 'utf8' - }, - seeding: { - prefix: 'seeding', - valueEncoding: 'json' + +DriveManager.generateSubDbs = function (db) { + return { + drives: sub(db, 'drives', { valueEncoding: 'bjson' }), + mirrors: sub(db, 'mirrors', { valueEncoding: 'utf8' }), + seeding: sub(db, 'seeding', { valueEncoding: 'json '}) } } diff --git a/lib/migrations/index.js b/lib/migrations/index.js index 8c8fc26..e0227ff 100644 --- a/lib/migrations/index.js +++ b/lib/migrations/index.js @@ -21,4 +21,4 @@ class Migrations { } } -module.export = Migrations +module.exports = Migrations diff --git a/lib/network.js b/lib/network.js index 9baf937..fdd6274 100644 --- a/lib/network.js +++ b/lib/network.js @@ -1,7 +1,7 @@ const sub = require('subleveldown') const bjson = require('buffer-json-encoding') const datEncoding = require('dat-encoding') -const Nanoresource = require('nanoresource-promise/emitter') +const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter') const { fromNetworkConfiguration, @@ -10,7 +10,7 @@ const { const { rpc } = require('hyperdrive-daemon-client') const { dbGet, dbCollect } = require('./common') -const log = require('../log').child({ component: 'network-manager' }) +const log = require('./log').child({ component: 'network-manager' }) class NetworkManager extends Nanoresource { constructor (networking, db, opts = {}) { @@ -19,7 +19,8 @@ class NetworkManager extends Nanoresource { this.db = db this.noAnnounce = !!opts.noAnnounce - this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) + const dbs = NetworkManager.generateSubDbs(db) + this._seedIndex = dbs.seeding } ready () { @@ -34,7 +35,7 @@ class NetworkManager extends Nanoresource { async _rejoin () { if (this.noAnnounce) return - const seedList = await collect(this._seedIndex) + const seedList = await dbCollect(this._seedIndex) for (const { key: discoveryKey, value: networkOpts } of seedList) { const opts = networkOpts && networkOpts.opts if (!opts || !opts.announce) continue @@ -112,10 +113,9 @@ class NetworkManager extends Nanoresource { return rsp } } -NetworkManager.SUB_DBS = { - seeding: { - prefix: 'seeding', - valueEncoding: 'json' +NetworkManager.generateSubDbs = function (db) { + return { + seeding: sub(db, 'seeding', { valueEncoding: 'json '}) } } diff --git a/lib/peers.js b/lib/peers.js index 40876fb..32045d5 100644 --- a/lib/peers.js +++ b/lib/peers.js @@ -5,7 +5,7 @@ const { rpc } = require('hyperdrive-daemon-client') const messages = rpc.peers.messages const WatchPeersTypes = messages.WatchPeersResponse.Type -const log = require('../log').child({ component: 'peers' }) +const log = require('./log').child({ component: 'peers' }) const ALIAS = Symbol('hyperdrive-peer-alias') diff --git a/lib/peersockets.js b/lib/peersockets.js index 1c36c1a..6ef7156 100644 --- a/lib/peersockets.js +++ b/lib/peersockets.js @@ -2,7 +2,7 @@ const { EventEmitter } = require('events') const { rpc } = require('hyperdrive-daemon-client') const messages = rpc.peersockets.messages -const log = require('../log').child({ component: 'peersockets' }) +const log = require('./log').child({ component: 'peersockets' }) const PeerMessageTypes = messages.PeerMessage.Type From c1ee51f210593a961f4f8f0697d3dbbb5f851cfc Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 15:47:41 +0200 Subject: [PATCH 15/28] Undo refactoring --- index.js | 12 +---- lib/drives/index.js | 91 ++++++++++++++++++++++++++++---- lib/network.js | 123 -------------------------------------------- 3 files changed, 81 insertions(+), 145 deletions(-) delete mode 100644 lib/network.js diff --git a/index.js b/index.js index fa0e8ff..12b950c 100644 --- a/index.js +++ b/index.js @@ -17,8 +17,6 @@ const { rpc, apiVersion } = require('hyperdrive-daemon-client') const { createMetadata } = require('./lib/metadata') const constants = require('hyperdrive-daemon-client/lib/constants') -const Migrations = require('./lib/migrations') -const NetworkManager = require('./lib/network') const DriveManager = require('./lib/drives') const PeersocketManager = require('./lib/peersockets') const PeersManager = require('./lib/peers') @@ -97,7 +95,6 @@ class HyperdriveDaemon extends EventEmitter { if (opts.latency !== undefined) this._networkOpts.latency = +opts.latency // Set in ready. - this.migrations = null this.networking = null this.db = null this.drives = null @@ -142,7 +139,6 @@ class HyperdriveDaemon extends EventEmitter { this.db = this._dbProvider(`${this.storage}/db`, { valueEncoding: 'json' }) const dbs = { - migrations: sub(this.db, 'migrations', { valueEncoding: varint }), fuse: sub(this.db, 'fuse', { valueEncoding: bjson }), drives: sub(this.db, 'drives', { valueEncoding: bjson }), network: sub(this.db, 'network', { valueEncoding: 'json'}) @@ -151,9 +147,6 @@ class HyperdriveDaemon extends EventEmitter { await this.corestore.ready() - this.migrations = new Migrations(dbs) - await this.migrations.ensureMigrated() - const seed = this.corestore._deriveSecret(NAMESPACE, 'replication-keypair') const swarmId = this.corestore._deriveSecret(NAMESPACE, 'swarm-id') this._networkOpts.keyPair = HypercoreProtocol.keyPair(seed) @@ -182,10 +175,7 @@ class HyperdriveDaemon extends EventEmitter { this.peersockets = new PeersocketManager(this.networking, this.peers, peersockets) if (!this.noDebug) this.debug = new DebugManager(this) - this.network = new NetworkManager(this.networking, dbs.network) - await this.network.ready() - - this.drives = new DriveManager(this.corestore, this.network, dbs.drives, { + this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, { ...this.opts, memoryOnly: this.memoryOnly, watchLimit: this.opts.watchLimit || WATCH_LIMIT diff --git a/lib/drives/index.js b/lib/drives/index.js index c1828c2..1ff8887 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -1,4 +1,5 @@ const hyperdrive = require('hyperdrive') +const hypercoreCrypto = require('hypercore-crypto') const datEncoding = require('dat-encoding') const pump = require('pump') const sub = require('subleveldown') @@ -7,7 +8,6 @@ const collectStream = require('stream-collector') const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter') const { Transform } = require('streamx') - const { fromHyperdriveOptions, fromStat, @@ -15,6 +15,7 @@ const { fromMetadata, fromDriveConfiguration, fromNetworkConfiguration, + toNetworkConfiguration, toHyperdriveOptions, toStat, toMount, @@ -33,11 +34,11 @@ const log = require('../log').child({ component: 'drive-manager' }) const TRIE_UPDATER_SYMBOL = Symbol('hyperdrive-daemon-trie-updater') class DriveManager extends Nanoresource { - constructor (corestore, network, db, opts = {}) { + constructor (corestore, networking, db, opts = {}) { super() this.corestore = corestore - this.network = network + this.networking = networking this.db = db this.opts = opts this.watchLimit = opts.watchLimit @@ -65,10 +66,21 @@ class DriveManager extends Nanoresource { async _open () { return Promise.all([ + this._rejoin(), this._remirror() ]) } + async _rejoin () { + if (this.noAnnounce) return + const seedList = await dbCollect(this._seedIndex) + for (const { key: discoveryKey, value: networkOpts } of seedList) { + const opts = networkOpts && networkOpts.opts + if (!opts || !opts.announce) continue + this.networking.join(discoveryKey, { ...networkOpts.opts }) + } + } + async _remirror () { const mirrorList = await dbCollect(this._mirrorIndex) for (const { key } of mirrorList) { @@ -179,6 +191,64 @@ class DriveManager extends Nanoresource { }) } + async configureNetwork (discoveryKey, opts = {}) { + const self = this + const encodedKey = datEncoding.encode(discoveryKey) + const networkOpts = { + lookup: !!opts.lookup, + announce: !!opts.announce, + } + const seeding = opts.lookup || opts.announce + var networkingPromise + + const sameConfig = sameNetworkConfig(discoveryKey, opts) + // If all the networking options are the same, exit early. + if (sameConfig) return + + const networkConfig = { opts: networkOpts } + if (seeding) await this._seedIndex.put(encodedKey, networkConfig) + else await this._seedIndex.del(encodedKey) + + // Failsafe + if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + + try { + if (seeding) { + networkingPromise = this.networking.join(discoveryKey, networkOpts) + } else { + networkingPromise = this.networking.leave(discoveryKey) + } + networkingPromise.then(configurationSuccess) + networkingPromise.catch(configurationError) + } catch (err) { + configurationError(err) + } + + function sameNetworkConfig (discoveryKey, opts = {}) { + const swarmStatus = self.networking.status(discoveryKey) + if (!swarmStatus) return opts.lookup === false && opts.announce === false + return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup + } + + function configurationError (err) { + log.error({ err, discoveryKey: encodedKey }, 'network configuration error') + } + + function configurationSuccess () { + log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') + } + } + + async getNetworkConfiguration (discoveryKey) { + const networkOpts = await dbGet(this._seedIndex, datEncoding.encode(discoveryKey)) + return networkOpts ? networkOpts.opts : null + } + + async getAllNetworkConfigurations () { + const storedConfigurations = (await dbCollect(this._seedIndex)).map(({ key, value }) => [key, value]) + return new Map(storedConfigurations) + } + async getAllStats (opts) { const allStats = [] for (const [, drive] of this._drives) { @@ -310,10 +380,10 @@ class DriveManager extends Nanoresource { if (opts.fuseNetwork) { // TODO: The Network drive does not announce or remember any settings for now. initialConfig = { lookup: true, announce: false, remember: false } - await this.network.configure(drive.metadata, initialConfig) + await this.configureNetwork(drive.metadata, initialConfig) } else if (!drive.writable || opts.seed) { initialConfig = { lookup: true, announce: false, remember: true } - await this.network.configure(drive.metadata, initialConfig) + await this.configureNetwork(drive.metadata, initialConfig) } // Make sure that any inner mounts are recorded in the drive index. @@ -323,15 +393,15 @@ class DriveManager extends Nanoresource { const mountKey = feed.key.toString('hex') log.info({ discoveryKey: feed.discoveryKey.toString('hex') }, 'registering mountpoint in drive index') - const parentConfig = (await this.network.getConfiguration(drive.discoveryKey)) || initialConfig || {} - const existingMountConfig = (await this.network.getConfiguration(feed.discoveryKey)) || {} + const parentConfig = (await this.getNetworkConfiguration(drive.discoveryKey)) || initialConfig || {} + const existingMountConfig = (await this.getNetworkConfiguration(feed.discoveryKey)) || {} const mountConfig = { lookup: (existingMountConfig.lookup !== false) && (parentConfig.lookup !== false), announce: !!(existingMountConfig.announce || parentConfig.announce), remember: true } - if (mountConfig) await this.network.configure(feed.discoveryKey, mountConfig) + if (mountConfig) await this.configureNetwork(feed.discoveryKey, mountConfig) this.emit('configured-mount', feed.key) this._configuredMounts.add(mountKey) @@ -412,8 +482,7 @@ class DriveManager extends Nanoresource { } async _rpcAllNetworkConfigurations (call) { - return this.network._rpcAllConfigurations(call) - const networkConfigurations = await this.network.getAllConfigurations() + const networkConfigurations = await this.getAllNetworkConfigurations() const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ @@ -452,7 +521,7 @@ class DriveManager extends Nanoresource { const drive = this.driveForSession(id) const opts = fromNetworkConfiguration(call.request.getNetwork()) - await this.network.configure(drive.metadata.discoveryKey, { ...opts }) + await this.configureNetwork(drive.metadata.discoveryKey, { ...opts }) const rsp = new rpc.drive.messages.ConfigureNetworkResponse() return rsp diff --git a/lib/network.js b/lib/network.js deleted file mode 100644 index fdd6274..0000000 --- a/lib/network.js +++ /dev/null @@ -1,123 +0,0 @@ -const sub = require('subleveldown') -const bjson = require('buffer-json-encoding') -const datEncoding = require('dat-encoding') -const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter') - -const { - fromNetworkConfiguration, - toNetworkConfiguration -} = require('hyperdrive-daemon-client/lib/common') -const { rpc } = require('hyperdrive-daemon-client') - -const { dbGet, dbCollect } = require('./common') -const log = require('./log').child({ component: 'network-manager' }) - -class NetworkManager extends Nanoresource { - constructor (networking, db, opts = {}) { - super() - this.networking = networking - this.db = db - this.noAnnounce = !!opts.noAnnounce - - const dbs = NetworkManager.generateSubDbs(db) - this._seedIndex = dbs.seeding - } - - ready () { - return this.open() - } - - async _open () { - return Promise.all([ - this._rejoin() - ]) - } - - async _rejoin () { - if (this.noAnnounce) return - const seedList = await dbCollect(this._seedIndex) - for (const { key: discoveryKey, value: networkOpts } of seedList) { - const opts = networkOpts && networkOpts.opts - if (!opts || !opts.announce) continue - this.networking.join(discoveryKey, { ...networkOpts.opts, loadForLength: true }) - } - } - - async configure (discoveryKey, opts = {}) { - const self = this - const encodedKey = datEncoding.encode(discoveryKey) - const networkOpts = { - lookup: !!opts.lookup, - announce: !!opts.announce, - } - const seeding = opts.lookup || opts.announce - var networkingPromise - - const sameConfig = sameNetworkConfig(discoveryKey, opts) - // If all the networking options are the same, exit early. - if (sameConfig) return - - const networkConfig = { opts: networkOpts } - if (seeding) await this._seedIndex.put(encodedKey, networkConfig) - else await this._seedIndex.del(encodedKey) - - // Failsafe - if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false - - try { - if (seeding) { - networkingPromise = this.networking.join(discoveryKey, networkOpts) - } else { - networkingPromise = this.networking.leave(discoveryKey) - } - networkingPromise.then(configurationSuccess) - networkingPromise.catch(configurationError) - } catch (err) { - configurationError(err) - } - - function sameNetworkConfig (discoveryKey, opts = {}) { - const swarmStatus = self.networking.status(discoveryKey) - if (!swarmStatus) return opts.lookup === false && opts.announce === false - return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup - } - - function configurationError (err) { - log.error({ err, discoveryKey: encodedKey }, 'network configuration error') - } - - function configurationSuccess () { - log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') - } - } - - async getConfiguration (discoveryKey) { - const networkOpts = await dbGet(this._seedIndex, datEncoding.encode(discoveryKey)) - return networkOpts ? networkOpts.opts : null - } - - async getAllConfigurations () { - const storedConfigurations = (await dbCollect(this._seedIndex)).map(({ key, value }) => [key, value]) - return new Map(storedConfigurations) - } - - async _rpcAllNetworkConfigurations (call) { - const networkConfigurations = await this.getAllConfigurations() - - const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() - rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ - ...value.opts, - key: Buffer.from(value.key, 'hex') - }))) - - return rsp - } -} -NetworkManager.generateSubDbs = function (db) { - return { - seeding: sub(db, 'seeding', { valueEncoding: 'json '}) - } -} - -module.exports = NetworkManager - From e1800121d62df84d587dfc70bdc8881e6afe09f2 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 17:06:11 +0200 Subject: [PATCH 16/28] Finished undoing unnecessary refactoring --- lib/common.js | 2 +- lib/drives/index.js | 49 +++++++++++++++++++++++++++------------------ 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/lib/common.js b/lib/common.js index a33705c..b7323ec 100644 --- a/lib/common.js +++ b/lib/common.js @@ -5,7 +5,7 @@ function getHandlers (manager) { const rpcMethods = Object.getOwnPropertyNames(manager.__proto__).filter(methodName => methodName.startsWith('_rpc')) for (let methodName of rpcMethods) { let rpcMethodName = methodName.slice(4) - rpcMethodName = rpcMethodName.charCodeAt(0).toLowerCase() + rpcMethodName.slice(1) + rpcMethodName = rpcMethodName[0].toLowerCase() + rpcMethodName.slice(1) handlers[rpcMethodName] = manager[methodName].bind(manager) } return handlers diff --git a/lib/drives/index.js b/lib/drives/index.js index 1ff8887..717a5eb 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -48,7 +48,9 @@ class DriveManager extends Nanoresource { this._driveIndex = dbs.drives this._mirrorIndex = dbs.mirrors + this._seedIndex = dbs.seeding + this._transientSeedIndex = new Map() this._drives = new Map() this._checkouts = new Map() this._watchers = new Map() @@ -191,32 +193,37 @@ class DriveManager extends Nanoresource { }) } - async configureNetwork (discoveryKey, opts = {}) { + async configureNetwork (feed, opts = {}) { const self = this - const encodedKey = datEncoding.encode(discoveryKey) + const encodedKey = datEncoding.encode(feed.discoveryKey) const networkOpts = { lookup: !!opts.lookup, announce: !!opts.announce, + remember: !!opts.remember } const seeding = opts.lookup || opts.announce var networkingPromise - const sameConfig = sameNetworkConfig(discoveryKey, opts) + const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) // If all the networking options are the same, exit early. if (sameConfig) return - const networkConfig = { opts: networkOpts } - if (seeding) await this._seedIndex.put(encodedKey, networkConfig) - else await this._seedIndex.del(encodedKey) + const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } + if (opts.remember) { + if (seeding) await this._seedIndex.put(encodedKey, networkConfig) + else await this._seedIndex.del(encodedKey) + } else { + this._transientSeedIndex.set(encodedKey, networkConfig) + } // Failsafe if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false try { if (seeding) { - networkingPromise = this.networking.join(discoveryKey, networkOpts) + networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) } else { - networkingPromise = this.networking.leave(discoveryKey) + networkingPromise = this.networking.leave(feed.discoveryKey) } networkingPromise.then(configurationSuccess) networkingPromise.catch(configurationError) @@ -240,13 +247,16 @@ class DriveManager extends Nanoresource { } async getNetworkConfiguration (discoveryKey) { - const networkOpts = await dbGet(this._seedIndex, datEncoding.encode(discoveryKey)) - return networkOpts ? networkOpts.opts : null + const encodedKey = datEncoding.encode(discoveryKey) + const networkOpts = this._transientSeedIndex.get(encodedKey) || await dbGet(this._seedIndex, encodedKey) + if (networkOpts) return networkOpts.opts + return null } async getAllNetworkConfigurations () { const storedConfigurations = (await dbCollect(this._seedIndex)).map(({ key, value }) => [key, value]) - return new Map(storedConfigurations) + const transientConfigurations = [...this._transientSeedIndex] + return new Map([...storedConfigurations, ...transientConfigurations]) } async getAllStats (opts) { @@ -378,11 +388,11 @@ class DriveManager extends Nanoresource { var initialConfig // TODO: Need to fully work through all the default networking behaviors. if (opts.fuseNetwork) { - // TODO: The Network drive does not announce or remember any settings for now. - initialConfig = { lookup: true, announce: false, remember: false } + // TODO: The Network drive does not announce any settings for now. + initialConfig = { lookup: true, announce: false } await this.configureNetwork(drive.metadata, initialConfig) } else if (!drive.writable || opts.seed) { - initialConfig = { lookup: true, announce: false, remember: true } + initialConfig = { lookup: true, announce: false } await this.configureNetwork(drive.metadata, initialConfig) } @@ -397,11 +407,10 @@ class DriveManager extends Nanoresource { const existingMountConfig = (await this.getNetworkConfiguration(feed.discoveryKey)) || {} const mountConfig = { lookup: (existingMountConfig.lookup !== false) && (parentConfig.lookup !== false), - announce: !!(existingMountConfig.announce || parentConfig.announce), - remember: true + announce: !!(existingMountConfig.announce || parentConfig.announce) } - if (mountConfig) await this.configureNetwork(feed.discoveryKey, mountConfig) + if (mountConfig) await this.configureNetwork(feed, mountConfig) this.emit('configured-mount', feed.key) this._configuredMounts.add(mountKey) @@ -521,7 +530,7 @@ class DriveManager extends Nanoresource { const drive = this.driveForSession(id) const opts = fromNetworkConfiguration(call.request.getNetwork()) - await this.configureNetwork(drive.metadata.discoveryKey, { ...opts }) + await this.configureNetwork(drive.metadata, { ...opts }) const rsp = new rpc.drive.messages.ConfigureNetworkResponse() return rsp @@ -536,7 +545,7 @@ class DriveManager extends Nanoresource { const recursive = call.request.getRecursive() const networkingOnly = call.request.getNetworkingonly() const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) - const networkConfig = await this.networking.getConfiguration(drive.discoveryKey) + const networkConfig = await this.getNetworkConfiguration(drive.discoveryKey) const rsp = new rpc.drive.messages.DriveStatsResponse() rsp.setStats(toDriveStats(driveStats)) @@ -1092,7 +1101,7 @@ DriveManager.generateSubDbs = function (db) { return { drives: sub(db, 'drives', { valueEncoding: 'bjson' }), mirrors: sub(db, 'mirrors', { valueEncoding: 'utf8' }), - seeding: sub(db, 'seeding', { valueEncoding: 'json '}) + seeding: sub(db, 'seeding', { valueEncoding: 'json' }) } } From 011835bdc32ab97e9a125fe4899b7e2975e6737f Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 17:38:13 +0200 Subject: [PATCH 17/28] Add unmirror method + mirror tests --- lib/drives/index.js | 22 +++++++ test/replication.js | 144 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) diff --git a/lib/drives/index.js b/lib/drives/index.js index 717a5eb..ae4af96 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -553,6 +553,28 @@ class DriveManager extends Nanoresource { return rsp } + async _rpcMirror (call) { + const id = call.request.getId() + + if (!id) throw new Error('A mirror request must specify a session ID.') + const drive = this.driveForSession(id) + await this._startMirroring(drive) + + const rsp = new rpc.drive.messages.MirrorResponse() + return rsp + } + + async _rpcUnmirror (call) { + const id = call.request.getId() + + if (!id) throw new Error('An unmirror request must specify a session ID.') + const drive = this.driveForSession(id) + await this._stopMirroring(drive) + + const rsp = new rpc.drive.messages.UnmirrorResponse() + return rsp + } + async _rpcDownload (call) { const id = call.request.getId() const path = call.request.getPath() diff --git a/test/replication.js b/test/replication.js index 795be8e..56eee97 100644 --- a/test/replication.js +++ b/test/replication.js @@ -175,6 +175,150 @@ test('can cancel an active download', async t => { } }) +test('can mirror a single drive', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await drive1.writeFile('/a/1', 'hello') + await drive1.writeFile('/a/2', 'world') + await drive1.writeFile('/a/3', 'three') + await drive1.writeFile('/a/4', 'four') + await drive1.writeFile('/a/5', 'five') + + // 100 ms delay for replication. + await delay(100) + + const d2Stats1 = await drive2.stats() + var stats = d2Stats1.stats + + // Since there has not been a content read yet, the stats will not report the latest content length. + t.same(stats[0].content.totalBlocks, 0) + + await drive2.mirror() + + // 200 ms delay for download to complete. + await delay(200) + + const d2Stats2 = await drive2.stats() + stats = d2Stats2.stats + + const fileStats = await drive2.fileStats('a') + t.same(stats[0].content.totalBlocks, 5) + t.same(stats[0].content.downloadedBlocks, 5) + t.same(fileStats.get('/a/1').downloadedBlocks, 1) + t.same(fileStats.get('/a/2').downloadedBlocks, 1) + t.same(fileStats.get('/a/3').downloadedBlocks, 1) + t.same(fileStats.get('/a/4').downloadedBlocks, 1) + t.same(fileStats.get('/a/5').downloadedBlocks, 1) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('can mirror a drive with mounts', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + const mount = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await drive1.mount('/a', { key: mount.key }) + await mount.writeFile('2', 'world') + await mount.writeFile('3', 'three') + await mount.writeFile('4', 'four') + await mount.writeFile('5', 'five') + + // 100 ms delay for replication. + await delay(100) + + const d2Stats1 = await drive2.stats() + var stats = d2Stats1.stats + + await drive2.mirror() + + // 200 ms delay for download to complete. + await delay(200) + + const d2Stats2 = await drive2.stats() + stats = d2Stats2.stats + + const fileStats = await drive2.fileStats('a') + t.same(stats[1].content.totalBlocks, 4) + t.same(stats[1].content.downloadedBlocks, 4) + t.same(fileStats.get('/a/2').downloadedBlocks, 1) + t.same(fileStats.get('/a/3').downloadedBlocks, 1) + t.same(fileStats.get('/a/4').downloadedBlocks, 1) + t.same(fileStats.get('/a/5').downloadedBlocks, 1) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('can cancel an active mirror', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await writeFile(drive1, '/a/1', 50) + await writeFile(drive1, '/a/2', 50) + + const unmirror = await drive2.mirror() + await delay(100) + await unmirror() + + // Wait to make sure that the download is not continuing. + await delay(100) + + const { stats: totals } = await drive2.stats() + const fileStats = await drive2.fileStats('a') + const contentTotals = totals[0].content + t.true(contentTotals.downloadedBlocks < 100 && contentTotals.downloadedBlocks > 0) + t.true(fileStats.get('/a/1').downloadedBlocks < 50 && fileStats.get('/a/1').downloadedBlocks > 0) + t.true(fileStats.get('/a/2').downloadedBlocks < 50 && fileStats.get('/a/2').downloadedBlocks > 0) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() + + async function writeFile (drive, name, numBlocks) { + const writeStream = drive.createWriteStream(name) + return new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + for (let i = 0; i < numBlocks; i++) { + writeStream.write(Buffer.alloc(1024 * 1024).fill('abcdefg')) + } + writeStream.end() + }) + } +}) + test('can replicate many mounted drives between daemons', async t => { const { clients, cleanup } = await create(2) console.time('many-mounts') From df91ee1c14357314f8f92b88491edea442f68f7c Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 17:49:53 +0200 Subject: [PATCH 18/28] 1.14.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index bb37afd..dbc5363 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperdrive-daemon", - "version": "1.13.18", + "version": "1.14.0", "description": "A FUSE-mountable distributed filesystem, built on Hyperdrive", "main": "index.js", "bin": { From b8e3d1b181894ffb5ce243504a88a21495e8c803 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 17:53:41 +0200 Subject: [PATCH 19/28] Remove unused migrations file --- lib/migrations/index.js | 24 ------------------------ 1 file changed, 24 deletions(-) delete mode 100644 lib/migrations/index.js diff --git a/lib/migrations/index.js b/lib/migrations/index.js deleted file mode 100644 index e0227ff..0000000 --- a/lib/migrations/index.js +++ /dev/null @@ -1,24 +0,0 @@ -const { dbGet } = require('../common') - -const LAST_MIGRATION_KEY = 'last_migration' -const migrations = [ - require('./1-move-seeding-db.js') -] - -class Migrations { - constructor (dbs) { - this.dbs = dbs - } - - async ensureMigrated () { - var lastMigration = await dbGet(this.dbs.migrations, LAST_MIGRATION_KEY) - for (let i = 0; i < migrations.length; i++) { - if (lastMigration && (i < lastMigration)) continue - await migrations[i](this.dbs) - lastMigration = i + 1 - await this.dbs.migrations.put(LAST_MIGRATION_KEY, lastMigration) - } - } -} - -module.exports = Migrations From 37fb59a7e7a89aca2cef5bd1c769d29f0dbd5ec6 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 9 Jun 2020 17:53:46 +0200 Subject: [PATCH 20/28] 1.14.1 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index dbc5363..c396851 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperdrive-daemon", - "version": "1.14.0", + "version": "1.14.1", "description": "A FUSE-mountable distributed filesystem, built on Hyperdrive", "main": "index.js", "bin": { From d864a089c656cf9e2072ed09975d19bbbed67430 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 15:54:25 -0300 Subject: [PATCH 21/28] Update: add hyperdrive client import/export tests --- test/hyperdrive.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 32a9283..251a9d4 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,3 +1,7 @@ +const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { tmpdir } = require('os') +const { once } = require('events') +const { join } = require('path') const test = require('tape') const collectStream = require('stream-collector') @@ -939,6 +943,67 @@ test('can get all network configurations', async t => { t.end() }) +test.only('import', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'import-test')) + await writeFile(join(tmpDir, 'test.txt'), 'hello world') + + try { + // now import the dir + const drive = await client.drive.get() + const importProgress = client.drive.import(tmpDir, drive) + + const [, dst] = await once(importProgress, 'put-end') + t.same(dst.name, '/test.txt') + const contents = await drive.readFile('test.txt', { encoding: 'utf8' }) + t.same(contents, 'hello world') + + await drive.close() + + importProgress.destroy() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('export', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'export-test')) + + try { + // create a test drive + const drive = await client.drive.get() + await drive.writeFile('export.txt', 'hello world') + + const dw = client.drive.export(drive, tmpDir) + await dw.start() + + let total, downloaded + dw.on('stats', async stats => { + total = stats.total + downloaded = stats.downloaded + console.log({ total, downloaded }) + if (total === downloaded) { + t.pass('stats OK') + const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + t.same(contents, 'hello world') + await drive.close() + await cleanup() + t.end() + } + }) + } catch (err) { + t.fail(err) + } +}) + // TODO: Figure out why the grpc server is not terminating. test.onFinish(() => { setTimeout(() => { From 6f89671e4351a6ad9e843c79b4cbcdc55c3f4318 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 16:22:33 -0300 Subject: [PATCH 22/28] Update: remove tape only call --- test/hyperdrive.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 251a9d4..89830fa 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -943,7 +943,7 @@ test('can get all network configurations', async t => { t.end() }) -test.only('import', async t => { +test('import', async t => { const { client, cleanup } = await createOne() // create a tmp dir From 480d5ed6985b346e0aedd28a23515603172f8593 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Fri, 15 May 2020 20:37:19 -0300 Subject: [PATCH 23/28] Update: tweak export test --- test/hyperdrive.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 89830fa..dae118c 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,4 +1,4 @@ -const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { promises: { mkdtemp, writeFile, readFile } } = require('fs') const { tmpdir } = require('os') const { once } = require('events') const { join } = require('path') @@ -989,10 +989,9 @@ test('export', async t => { dw.on('stats', async stats => { total = stats.total downloaded = stats.downloaded - console.log({ total, downloaded }) if (total === downloaded) { t.pass('stats OK') - const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + const contents = await readFile(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) t.same(contents, 'hello world') await drive.close() await cleanup() From 08cc3473298a642d7254b22e1a4bc54db3af324d Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 16 Jun 2020 21:21:16 +0200 Subject: [PATCH 24/28] Fix FUSE RPC methods --- lib/fuse/index.js | 94 +++++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 48 deletions(-) diff --git a/lib/fuse/index.js b/lib/fuse/index.js index 8538a6a..1be4087 100644 --- a/lib/fuse/index.js +++ b/lib/fuse/index.js @@ -613,67 +613,65 @@ class FuseManager extends EventEmitter { return new Map([...this._drives]) } - getHandlers () { - return { - mount: async (call) => { - var mountOpts = call.request.getOpts() - const mnt = call.request.getPath() - if (mountOpts) mountOpts = fromHyperdriveOptions(mountOpts) + // RPC Methods - if (!mnt) throw new Error('A mount request must specify a mountpoint.') - const mountInfo = await this.mount(mnt, mountOpts) + async _rpcMount (call) { + var mountOpts = call.request.getOpts() + const mnt = call.request.getPath() + if (mountOpts) mountOpts = fromHyperdriveOptions(mountOpts) - const rsp = new rpc.fuse.messages.MountResponse() - rsp.setMountinfo(toHyperdriveOptions(mountInfo)) - rsp.setPath(mnt) + if (!mnt) throw new Error('A mount request must specify a mountpoint.') + const mountInfo = await this.mount(mnt, mountOpts) - return rsp - }, + const rsp = new rpc.fuse.messages.MountResponse() + rsp.setMountinfo(toHyperdriveOptions(mountInfo)) + rsp.setPath(mnt) - unmount: async (call) => { - const mnt = call.request.getPath() + return rsp + } - await this.unmount(mnt) + async _rpcUnmount (call) { + const mnt = call.request.getPath() - return new rpc.fuse.messages.UnmountResponse() - }, + await this.unmount(mnt) - status: (call) => { - const rsp = new rpc.fuse.messages.FuseStatusResponse() - rsp.setAvailable(true) - return new Promise((resolve, reject) => { - hyperfuse.isConfigured((err, configured) => { - if (err) return reject(err) - rsp.setConfigured(configured) - return resolve(rsp) - }) - }) - }, + return new rpc.fuse.messages.UnmountResponse() + } + + async _rpcStatus (call) { + const rsp = new rpc.fuse.messages.FuseStatusResponse() + rsp.setAvailable(true) + return new Promise((resolve, reject) => { + hyperfuse.isConfigured((err, configured) => { + if (err) return reject(err) + rsp.setConfigured(configured) + return resolve(rsp) + }) + }) + } - info: async (call) => { - const rsp = new rpc.fuse.messages.InfoResponse() - const mnt = call.request.getPath() + async _rpcInfo (call) { + const rsp = new rpc.fuse.messages.InfoResponse() + const mnt = call.request.getPath() - const { key, mountPath, writable, relativePath } = await this.info(mnt) - rsp.setKey(key) - rsp.setPath(relativePath) - rsp.setMountpath(mountPath) - rsp.setWritable(writable) + const { key, mountPath, writable, relativePath } = await this.info(mnt) + rsp.setKey(key) + rsp.setPath(relativePath) + rsp.setMountpath(mountPath) + rsp.setWritable(writable) - return rsp - }, + return rsp + } - download: async (call) => { - const rsp = new rpc.fuse.messages.DownloadResponse() - const path = call.request.getPath() + async _rpcDownload (call) { + const rsp = new rpc.fuse.messages.DownloadResponse() + const path = call.request.getPath() - const { downloadId, sessionId } = await this.download(path) - rsp.setDownloadid(downloadId) - rsp.setSessionid(sessionId) + const { downloadId, sessionId } = await this.download(path) + rsp.setDownloadid(downloadId) + rsp.setSessionid(sessionId) - return rsp - } - } + return rsp } } From 618f120e0bcb32aef994c4326866947a3dcc3f35 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Tue, 16 Jun 2020 21:22:22 +0200 Subject: [PATCH 25/28] 1.14.2 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index c396851..5d5e4b2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperdrive-daemon", - "version": "1.14.1", + "version": "1.14.2", "description": "A FUSE-mountable distributed filesystem, built on Hyperdrive", "main": "index.js", "bin": { From 9c91ed95c3ff08bce65be266950d86c574358f96 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 15:54:25 -0300 Subject: [PATCH 26/28] Update: add hyperdrive client import/export tests --- test/hyperdrive.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 32a9283..251a9d4 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,3 +1,7 @@ +const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { tmpdir } = require('os') +const { once } = require('events') +const { join } = require('path') const test = require('tape') const collectStream = require('stream-collector') @@ -939,6 +943,67 @@ test('can get all network configurations', async t => { t.end() }) +test.only('import', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'import-test')) + await writeFile(join(tmpDir, 'test.txt'), 'hello world') + + try { + // now import the dir + const drive = await client.drive.get() + const importProgress = client.drive.import(tmpDir, drive) + + const [, dst] = await once(importProgress, 'put-end') + t.same(dst.name, '/test.txt') + const contents = await drive.readFile('test.txt', { encoding: 'utf8' }) + t.same(contents, 'hello world') + + await drive.close() + + importProgress.destroy() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('export', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'export-test')) + + try { + // create a test drive + const drive = await client.drive.get() + await drive.writeFile('export.txt', 'hello world') + + const dw = client.drive.export(drive, tmpDir) + await dw.start() + + let total, downloaded + dw.on('stats', async stats => { + total = stats.total + downloaded = stats.downloaded + console.log({ total, downloaded }) + if (total === downloaded) { + t.pass('stats OK') + const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + t.same(contents, 'hello world') + await drive.close() + await cleanup() + t.end() + } + }) + } catch (err) { + t.fail(err) + } +}) + // TODO: Figure out why the grpc server is not terminating. test.onFinish(() => { setTimeout(() => { From de1c74ff91a89fa32ebff21acb737eb9b002074b Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Tue, 12 May 2020 16:22:33 -0300 Subject: [PATCH 27/28] Update: remove tape only call --- test/hyperdrive.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 251a9d4..89830fa 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -943,7 +943,7 @@ test('can get all network configurations', async t => { t.end() }) -test.only('import', async t => { +test('import', async t => { const { client, cleanup } = await createOne() // create a tmp dir From 1a56b16a4c8a87c2ac976bb806a53519a9174280 Mon Sep 17 00:00:00 2001 From: Diego Paez Date: Fri, 15 May 2020 20:37:19 -0300 Subject: [PATCH 28/28] Update: tweak export test --- test/hyperdrive.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/hyperdrive.js b/test/hyperdrive.js index 89830fa..dae118c 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,4 +1,4 @@ -const { promises: { mkdtemp, writeFile, readdir } } = require('fs') +const { promises: { mkdtemp, writeFile, readFile } } = require('fs') const { tmpdir } = require('os') const { once } = require('events') const { join } = require('path') @@ -989,10 +989,9 @@ test('export', async t => { dw.on('stats', async stats => { total = stats.total downloaded = stats.downloaded - console.log({ total, downloaded }) if (total === downloaded) { t.pass('stats OK') - const contents = await readdir(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + const contents = await readFile(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) t.same(contents, 'hello world') await drive.close() await cleanup()