Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 93 additions & 72 deletions packages/node_modules/pouchdb-abstract-mapreduce/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
callbackify,
sequentialize,
uniq,
fin,
promisedCallback,
mapToKeysArray,
QueryParseError,
Expand Down Expand Up @@ -147,13 +146,11 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
}
}

function postprocessAttachments(opts) {
return function (res) {
function postprocessAttachments(res, opts) {
if (opts.include_docs && opts.attachments && opts.binary) {
readAttachmentsAsBlobOrBuffer(res);
}
return res;
};
}

function addHttpParam(paramName, opts, params, asJson) {
Expand Down Expand Up @@ -303,10 +300,7 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
throw new Error(row.reason);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this make this code path synchronous instead of asynchronous? I have not looked at the bigger picture yet, but my gut feeling says that handling attachments (large binary values) probably wants to be async?

Copy link
Contributor Author

@emilygilberts emilygilberts Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I was thinking that its safe on this call since the function addHttpParam is already async and before calling postprocessAttachments we still await the db.fetch and response.json.

I thought that since postprocessAttachments and readAttachmentsAsBlobOrBuffer themselves are synchronous, wrapping the result in a Promise and calling postprocessAttachments on .then() only slightly delays the call? - if not this is definitely where my confusion lies! :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emily and I paired on this and I am now convinced that postprocessAttachments() and further down are all synchronous functions, so that is all fine.

The only thing left here to find out is why the original code does the wrapping in another promise to move the post processing to the next tick.

Maybe @AlbaHerrerias you remember when you made that change [checks notes] a long time ago? ;D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi 👋 unfortunately I do not recall, this was 3 years ago after all. I went to the PR that introduced the change but still no idea, it is true that looks odd

return new Promise(function (resolve) {
resolve(result);
}).then(postprocessAttachments(opts));
return postprocessAttachments(result, opts);
}

// We are using a temporary view, terrible for performance, good for testing
Expand All @@ -333,9 +327,7 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
throw generateErrorFromResponse(result);
}

return new Promise(function (resolve) {
resolve(result);
}).then(postprocessAttachments(opts));
return postprocessAttachments(result, opts);
}

// custom adapters can define their own api._query
Expand Down Expand Up @@ -451,50 +443,63 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
return processKeyValueDocs(metaDoc, keyValueDocs);
}

function updatePurgeSeq(view) {
async function updatePurgeSeq(view) {
// with this approach, we just assume to have processed all missing purges and write the latest
// purgeSeq into the _local/purgeSeq doc.
return view.sourceDB.get('_local/purges').then(function (res) {
try {
const res = await view.sourceDB.get('_local/purges');
const purgeSeq = res.purgeSeq;
return view.db.get('_local/purgeSeq').then(function (res) {
return res._rev;
})
.catch(defaultsTo(undefined))
.then(function (rev) {
return view.db.put({
//TODO: (defaultsto)-rev is purgeSeqDoc._rev or undefined
let rev;
try {
const purgeSeqDoc = await view.db.get('_local/purgeSeq');
rev = purgeSeqDoc._rev;
} catch (err) {
if (err.status !== 404) {
throw err;
} else {
rev = undefined;
}
}
return await view.db.put({
_id: '_local/purgeSeq',
_rev: rev,
purgeSeq,
});
});
}).catch(function (err) {
} catch (err) {
if (err.status !== 404) {
throw err;
}
});
}
}

// updates all emitted key/value docs and metaDocs in the mrview database
// for the given batch of documents from the source database
function saveKeyValues(view, docIdsToChangesAndEmits, seq) {
var seqDocId = '_local/lastSeq';
return view.db.get(seqDocId)
.catch(defaultsTo({_id: seqDocId, seq: 0}))
.then(function (lastSeqDoc) {
var docIds = mapToKeysArray(docIdsToChangesAndEmits);
return Promise.all(docIds.map(function (docId) {
return getDocsToPersist(docId, view, docIdsToChangesAndEmits);
})).then(function (listOfDocsToPersist) {
var docsToPersist = listOfDocsToPersist.flat();
async function saveKeyValues(view, docIdsToChangesAndEmits, seq) {
const seqDocId = '_local/lastSeq';
//TODO: (defaultsto)- lastSeqDoc get or {_id: seqDocId, seq: 0}
let lastSeqDoc;
try {
lastSeqDoc = await view.db.get(seqDocId);
} catch (err) {
if (err.status !== 404) {
throw err;
}
lastSeqDoc = {_id: seqDocId, seq: 0};
}
const docIds = mapToKeysArray(docIdsToChangesAndEmits);
const listOfDocsToPersist = await Promise.all(docIds.map((docId) =>
getDocsToPersist(docId, view, docIdsToChangesAndEmits)));

const docsToPersist = listOfDocsToPersist.flat();
lastSeqDoc.seq = seq;
docsToPersist.push(lastSeqDoc);
// write all docs in a single operation, update the seq once
return view.db.bulkDocs({docs : docsToPersist});
})
await view.db.bulkDocs({docs: docsToPersist });

// TODO: this should be placed somewhere else, probably? we're querying both docs twice
// (first time when getting the actual purges).
.then(() => updatePurgeSeq(view));
});
return updatePurgeSeq(view);
}

function getQueue(view) {
Expand Down Expand Up @@ -532,12 +537,11 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {

let currentSeq = view.seq || 0;

function createTask() {
return view.sourceDB.info().then(function (info) {
async function createTask() {
const info = await view.sourceDB.info();
taskId = view.sourceDB.activeTasks.add({
name: 'view_indexing',
total_items: info.update_seq - currentSeq,
});
});
}

Expand Down Expand Up @@ -569,30 +573,50 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
return processBatch(response, purges);
}

function getRecentPurges() {
return view.db.get('_local/purgeSeq').then(function (res) {
return res.purgeSeq;
})
.catch(defaultsTo(-1))
.then(function (purgeSeq) {
return view.sourceDB.get('_local/purges').then(function (res) {
const recentPurges = res.purges.filter(function (purge, index) {
return index > purgeSeq;
}).map((purge) => purge.docId);

const uniquePurges = recentPurges.filter(function (docId, index) {
return recentPurges.indexOf(docId) === index;
});
async function getRecentPurges() {
//TODO: (defaultsto)- purgeSeq get or defaults to -1
let purgeSeq;
try {
const res = await view.db.get('_local/purgeSeq');
purgeSeq = res.purgeSeq;
} catch (err) {
if (err.status !== 404) {
throw err;
}
purgeSeq = -1;
}

let purges;
try {
//TODO: recentPurges get and filter or defaults to []
const res = await view.sourceDB.get('_local/purges');
purges = res.purges;


} catch (err) {
if (err.status !== 404) {
throw err;
}
purges = [];
}

return Promise.all(uniquePurges.map(function (docId) {
return view.sourceDB.get(docId).then(function (doc) {
const recentPurges = purges.filter((purge, index) => index > purgeSeq)
.map((purge) => purge.docId);

const uniquePurges = recentPurges.filter(
(docId, index) => recentPurges.indexOf(docId) === index);

return Promise.all(uniquePurges.map(async (docId) => {
try {
const doc = await view.sourceDB.get(docId);
return { docId, doc };
})
.catch(defaultsTo({ docId }));
}));
})
.catch(defaultsTo([]));
});
//TODO: defaults to docId
} catch (err) {
if (err.status !== 404) {
throw err;
} return { docId };
}
}));
}

function processBatch(response, purges) {
Expand Down Expand Up @@ -972,10 +996,8 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
return new db.constructor(viewDBName, db.__opts).destroy();
})();
});

return Promise.all(destroyPromises).then(function () {
await Promise.all(destroyPromises);
return {ok: true};
});
} catch (err) {
if (err.status === 404) {
return {ok: true};
Expand Down Expand Up @@ -1011,10 +1033,12 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
/* temporary */ true,
/* localDocName */ localDocName);

return fin(updateView(view, updateViewOpts).then(
function () { return queryView(view, opts); }),
function () { return view.db.destroy(); }
);
try {
await updateView(view, updateViewOpts);
return queryView(view, opts);
} finally {
await view.db.destroy();
}
});
return tempViewQueue.finish();
} else {
Expand Down Expand Up @@ -1068,10 +1092,7 @@ function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) {
if (typeof fun === 'function') {
fun = {map : fun};
}

const promise = Promise.resolve().then(function () {
return queryPromised(db, fun, opts);
});
const promise = queryPromised(db, fun, opts);
promisedCallback(promise, callback);
return promise;
}
Expand Down
Loading