Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions core/base/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ export class CRDTClockImpl {
prevHead,
updates,
})) {
return this.processUpdates(updatesAcc, all, prevHead);
await this.processUpdates(updatesAcc, all, prevHead);
return;
}
}

Expand Down Expand Up @@ -137,7 +138,14 @@ export class CRDTClockImpl {
.Int("prevHeadLength", prevHead.length)
.Int("currentHeadLength", this.head.length)
.Msg("INT_APPLY_HEAD: Entry point");
// console.log("int_applyHead", this.applyHeadQueue.size(), this.head, newHead, prevHead, localUpdates);
this.logger
.Debug()
.Int("queueSize", this.applyHeadQueue.size())
.Any("currentHead", this.head)
.Any("newHead", newHead)
.Any("prevHead", prevHead)
.Any("localUpdates", localUpdates)
.Msg("int_applyHead processing");
const ogHead = sortClockHead(this.head);
newHead = sortClockHead(newHead);
if (compareClockHeads(ogHead, newHead)) {
Expand All @@ -153,6 +161,20 @@ export class CRDTClockImpl {
if (!this.blockstore) {
throw this.logger.Error().Msg("missing blockstore").AsError();
}
this.logger
.Debug()
.Str("dbName", this.blockstore?.crdtParent?.ledgerParent?.name || "unnamed")
.Str(
"carLog",
this.blockstore?.loader?.carLog
?.asArray()
.map((cg) => cg.map((c) => c.toString()).join(","))
.join(";") || "no-carlog",
)
.Str("validatingHead", newHead.map((h) => h.toString()).join(","))
.Msg("PRE_VALIDATION: CarLog before block validation");
// Add sleep to test race condition theory
// await sleep(1000);
await validateBlocks(this.logger, newHead, this.blockstore);
if (!this.transaction) {
this.transaction = this.blockstore.openTransaction({ noLoader, add: false });
Expand Down Expand Up @@ -216,7 +238,7 @@ async function validateBlocks(logger: Logger, newHead: ClockHead, blockstore?: B
newHead.map(async (cid) => {
const got = await blockstore.get(cid);
if (!got) {
throw logger.Error().Str("cid", cid.toString()).Msg("int_applyHead missing block").AsError();
throw logger.Error().Str("cid", cid.toString()).Msg("validateBlocks missing block").AsError();
}
});
}
Expand All @@ -230,8 +252,7 @@ async function advanceBlocks(logger: Logger, newHead: ClockHead, tblocks: CarTra
try {
head = await advance(toPailFetcher(tblocks), head, cid);
} catch (e) {
logger.Error().Err(e).Msg("failed to advance head");
// console.log('failed to advance head:', cid.toString(), e)
logger.Error().Str("cid", cid.toString()).Err(e).Msg("failed to advance head");
// continue;
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/base/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
PARAM,
NotFoundError,
} from "@fireproof/core-types-base";
import { Logger } from "@adviser/cement";
import { exception2Result, Logger } from "@adviser/cement";
import { Link, Version } from "multiformats";

function toString<K extends IndexKeyType>(key: K, logger: Logger): string {
Expand All @@ -58,7 +58,11 @@ export function toPailFetcher(tblocks: BlockFetcher): PailBlockFetcher {
get: async <T = unknown, C extends number = number, A extends number = number, V extends Version = 1>(
link: Link<T, C, A, V>,
) => {
const block = await tblocks.get(link);
const rBlock = await exception2Result(() => tblocks.get(link));
if (rBlock.isErr()) {
return undefined;
}
const block = rBlock.Ok();
return block
? ({
cid: block.cid,
Expand Down
20 changes: 14 additions & 6 deletions core/base/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ export class CRDTImpl implements CRDT {
applyMeta: async (meta: TransactionMeta) => {
const crdtMeta = meta as CRDTMeta;
if (!crdtMeta.head) throw this.logger.Error().Msg("missing head").AsError();
// console.log("applyMeta-pre", crdtMeta.head, this.clock.head);
this.logger
.Debug()
.Any("incomingHead", crdtMeta.head)
.Any("currentHead", this.clock.head)
.Msg("applyMeta-pre");
this.logger
.Debug()
.Str("newHead", crdtMeta.head.map((h) => h.toString()).join(","))
Expand All @@ -115,7 +119,11 @@ export class CRDTImpl implements CRDT {
.Str("dbName", this.opts.name || "unnamed")
.Msg("APPLY_META: Calling applyHead for REMOTE sync");
await this.clock.applyHead(crdtMeta.head, []);
// console.log("applyMeta-post", crdtMeta.head, this.clock.head);
this.logger
.Debug()
.Any("resultHead", crdtMeta.head)
.Any("clockHead", this.clock.head)
.Msg("applyMeta-post");
},
compactStrategy: rCompactStrategy.Ok(),
gatewayInterceptor: opts.gatewayInterceptor,
Expand Down Expand Up @@ -209,13 +217,13 @@ export class CRDTImpl implements CRDT {
async ready(): Promise<void> {
return this.onceReady.once(async () => {
try {
// console.log("bs-ready-pre")
this.logger.Debug().Msg("blockstore ready preparation");
// await this.blockstore.ready();
// console.log("bs-ready-post-1")
this.logger.Debug().Msg("blockstore ready completed");
// await this.indexBlockstore?.ready();
// console.log("bs-ready-post-2")
this.logger.Debug().Msg("index blockstore ready completed");
// await this.clock.ready();
// console.log("bs-ready-post-3")
this.logger.Debug().Msg("clock ready completed");
await Promise.all([
this.blockstore.ready(),
this.indexBlockstore ? this.indexBlockstore.ready() : Promise.resolve(),
Expand Down
7 changes: 5 additions & 2 deletions core/base/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
Attached,
NotFoundError,
QueryResult,
isNotFoundError,
} from "@fireproof/core-types-base";
import { ensureLogger, makeName } from "@fireproof/core-runtime";

Expand Down Expand Up @@ -78,7 +79,10 @@ export class DatabaseImpl implements Database {
const { doc } = got;
return { ...(doc as unknown as DocWithId<T>), _id: id };
} catch (e) {
throw new NotFoundError(`Not found: ${id} - ${e instanceof Error ? e.message : String(e)}`);
if (isNotFoundError(e)) {
throw e;
}
throw this.logger.Error().Err(e).Msg("unexpect error").AsError();
}
}

Expand Down Expand Up @@ -157,7 +161,6 @@ export class DatabaseImpl implements Database {
if (typeof opts.limit === "number" && opts.limit >= 0) {
rows = rows.slice(0, opts.limit);
}

return { rows, clock: head, name: this.name };
}

Expand Down
8 changes: 8 additions & 0 deletions core/blockstore/commitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ export async function commit<T>(
}),
);

// LOG: Verify CIDs before they go to writeMeta - test invariant
// console.log("COMMIT_CIDS:", cids.map(c => `${c.toString().substring(0,3)}...`));
// console.log("COMMIT_CID_VALIDATION:", {
// allCarCids: cids.every(c => c.toString().startsWith("bae")),
// anyRawCids: cids.some(c => c.toString().startsWith("baf")),
// count: cids.length
// });

// await params.carStore.save({ cid, bytes });
// const newDbMeta = { cars: cids };
// await params.WALStore.enqueue(newDbMeta, opts);
Expand Down
65 changes: 59 additions & 6 deletions core/blockstore/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
type Attachable,
type Attached,
type CarTransaction,
type CRDTMeta,
type DbMeta,
type Falsy,
type SuperThis,
Expand Down Expand Up @@ -132,6 +133,14 @@ class CommitAction implements CommitParams {
}

async writeMeta(cids: AnyLink[]): Promise<void> {
// LOG: Verify these are CAR CIDs, not raw block CIDs - test invariant
// this.logger.Debug()
// .Str("cidsToWrite", cids.map(c => c.toString()).join(","))
// .Str("cidsPrefix", cids.map(c => c.toString().substring(0,3)).join(","))
// .Bool("allCarCids", cids.every(c => c.toString().startsWith("bae")))
// .Bool("anyRawCids", cids.some(c => c.toString().startsWith("baf")))
// .Msg("DBMETA_CREATION: CIDs being written to DbMeta");

const meta = { cars: cids };
await this.attached.local().active.meta.save(meta);
// detached remote stores
Expand Down Expand Up @@ -218,10 +227,11 @@ export class Loader implements Loadable {
try {
const store = this.attachedStores.activate(at.stores);
await this.tryToLoadStaleCars(store);
const localDbMeta = this.currentMeta; // await store.local().active.meta.load();
const localDbMeta = this.XXXcurrentMeta; // await store.local().active.meta.load();
const remoteDbMeta = store.active.meta.stream();
await this.waitFirstMeta(remoteDbMeta.getReader(), store, { origin: store.active.meta.url() });
if (localDbMeta) {
this.logger.Warn().Any({ url: store.active.meta.url(), localDbMeta }).Msg("localDbMeta");
await this.ensureAttachedStore(store, localDbMeta);
}
/* ultra hacky */
Expand Down Expand Up @@ -320,9 +330,7 @@ export class Loader implements Loadable {
this.blockstoreParent?.crdtParent?.ledgerParent?.name,
);
const local = this.attachedStores.local();
// console.log("ready", this.id);
this.metaStreamReader = local.active.meta.stream().getReader();
// console.log("attach-local", local.active.car.url().pathname);
await this.waitFirstMeta(this.metaStreamReader, local, { meta: this.ebOpts.meta, origin: local.active.car.url() });
});
}
Expand Down Expand Up @@ -513,6 +521,13 @@ export class Loader implements Loadable {
if (this.seenMeta.has(metaKey)) return [];
this.seenMeta.add(metaKey);

// LOG: CarLog state before merge
this.logger.Debug()
.Str("beforeMerge_carLog", this.carLog.asArray().map(cg => cg.map(c => c.toString()).join(",")).join(";"))
.Str("incoming_dbMeta_cars", meta.cars.map(c => c.toString()).join(","))
.Str("loaderId", this.id)
.Msg("MERGE_BEFORE: CarLog state before merge");

// if (meta.key) {
// await this.setKey(meta.key);
// }
Expand All @@ -521,6 +536,13 @@ export class Loader implements Loadable {
}
// console.log("mergeDbMetaIntoClock", activeStore.active.car.url().pathname);
const carHeader = await this.loadCarHeaderFromMeta<TransactionMeta>(meta, activeStore);
this.logger
.Debug()
.Str("loaderId", this.id)
.Str("carCids", meta.cars.map((c) => c.toString()).join(","))
.Str("extractedHead", (carHeader.meta as CRDTMeta)?.head?.map((h) => h.toString()).join(",") || "no-head")
.Url(activeStore.active.car.url())
.Msg("MERGE_META: Extracted CAR header meta");
// fetch other cars down the compact log?
// todo we should use a CID set for the compacted cids (how to expire?)
// console.log('merge carHeader', carHeader.head.length, carHeader.head.toString(), meta.car.toString())
Expand All @@ -529,8 +551,28 @@ export class Loader implements Loadable {
if (warns.length > 0) {
this.logger.Warn().Any("warns", warns).Msg("error getting more readers");
}
// LOG: CarLog update calculation
this.logger.Debug()
.Str("uniqueCids_input", [meta.cars, ...this.carLog.asArray(), ...carHeader.cars].flat().map(c => c.toString()).join(","))
.Str("seenCompacted", "LRUSet-content")
.Int("seenCompactedSize", this.seenCompacted.size)
.Str("loaderId", this.id)
.Msg("CARLOG_UPDATE: Before uniqueCids calculation");

const cgs = uniqueCids([meta.cars, ...this.carLog.asArray(), ...carHeader.cars], this.seenCompacted);

this.logger.Debug()
.Str("uniqueCids_output", cgs.flat().map(c => c.toString()).join(","))
.Str("loaderId", this.id)
.Msg("CARLOG_UPDATE: After uniqueCids calculation");

this.carLog.update(cgs);

// LOG: CarLog state after update
this.logger.Debug()
.Str("afterUpdate_carLog", this.carLog.asArray().map(cg => cg.map(c => c.toString()).join(",")).join(";"))
.Str("loaderId", this.id)
.Msg("MERGE_AFTER: CarLog state after update");
// console.log(
// ">>>>> pre applyMeta",
// this.carLog
Expand Down Expand Up @@ -765,7 +807,6 @@ export class Loader implements Loadable {
}

async getBlock(cid: AnyLink): Promise<FPBlock | Falsy> {
await this.ready();
const got = this.cidCache.get(cid.toString());
return got.value;
}
Expand Down Expand Up @@ -828,10 +869,18 @@ export class Loader implements Loadable {
const activeStore = store.active as CarStore;
try {
//loadedCar now is an array of AnyBlocks
this.logger.Debug().Any("cid", carCidStr).Msg("loading car");
this.logger.Debug()
.Str("cid", carCidStr)
.Str("loaderId", this.id)
.Url(activeStore.url())
.Msg("NETWORK_REQUEST: About to load CAR from store");
loadedCar = await activeStore.load(carCid);
// console.log("loadedCar", carCid);
this.logger.Debug().Bool("loadedCar", loadedCar).Msg("loaded");
this.logger.Debug()
.Str("cid", carCidStr)
.Bool("loadedCar", !!loadedCar)
.Url(activeStore.url())
.Msg(loadedCar ? "NETWORK_SUCCESS: CAR loaded successfully" : "NETWORK_FAILURE: CAR load returned undefined");
} catch (e) {
if (!isNotFoundError(e)) {
throw this.logger.Error().Str("cid", carCidStr).Err(e).Msg("loading car");
Expand Down Expand Up @@ -896,6 +945,10 @@ export class Loader implements Loadable {
// roots: [],
// }));
}
ensureLogger(this.sthis, "LoaderCarContent").Debug().Any({
carCid: carCidStr,
constent: blocks.map((b) => b.cid.toString()),
}).Msg("loaded-car");
return {
cid: carCid,
bytes: bytes.value.data,
Expand Down
6 changes: 3 additions & 3 deletions core/blockstore/register-store-protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BuildURI, ResolveOnce, runtimeFn, URI } from "@adviser/cement";
import { SuperThis, PARAM } from "@fireproof/core-types-base";
import { SerdeGateway, Gateway } from "@fireproof/core-types-blockstore";
import { SerdeGateway, Gateway, FPEnvelope } from "@fireproof/core-types-blockstore";
import { MemoryGateway } from "@fireproof/core-gateways-memory";
import { FileGateway, FILESTORE_VERSION, sysFileSystemFactory } from "@fireproof/core-gateways-file";
import { DefSerdeGateway, INDEXEDDB_VERSION } from "@fireproof/core-gateways-base";
Expand Down Expand Up @@ -165,14 +165,14 @@ if (runtimeFn().isBrowser) {
});
}

const memory = new Map<string, Uint8Array>();
const memory = new Map<string, FPEnvelope<unknown>>();
registerStoreProtocol({
protocol: "memory:",
isDefault: false,
defaultURI: () => {
return BuildURI.from("memory://").pathname("ram").URI();
},
gateway: async (sthis) => {
serdegateway: async (sthis) => {
return new MemoryGateway(sthis, memory);
},
});
Expand Down
1 change: 0 additions & 1 deletion core/blockstore/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
// console.log(
// "subscribe",
// this.myId,
// id.str,
// this.url().pathname,
// dbMetas.map((i) => i.cars.map((i) => i.toString())).flat(2),
// );
Expand Down
2 changes: 1 addition & 1 deletion core/gateways/base/fp-envelope-serialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function dbMetaEvent2Serialized(
);
}

function WALState2Serialized(sthis: SuperThis, wal: WALState): SerializedWAL {
export function WALState2Serialized(sthis: SuperThis, wal: WALState): SerializedWAL {
const serializedWAL: SerializedWAL = {
fileOperations: wal.fileOperations.map((fop) => ({
cid: fop.cid.toString(),
Expand Down
6 changes: 3 additions & 3 deletions core/gateways/base/meta-key-hack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ export function addKeyToDbMetaEncoder(ctx: SerdeGatewayCtx, version: "v1" | "v2"
}

export class AddKeyToDbMetaGateway implements SerdeGateway {
private readonly sdGw: DefSerdeGateway;
private readonly sdGw: SerdeGateway;
readonly version: "v1" | "v2";
constructor(gw: Gateway, version: "v1" | "v2") {
this.sdGw = new DefSerdeGateway(gw);
constructor(gw: SerdeGateway, version: "v1" | "v2") {
this.sdGw = gw;
this.version = version;
}

Expand Down
Loading