Initial public commit

This commit is contained in:
Tracy Rust 2024-01-31 17:56:19 -05:00
commit 5f6acc8837
8 changed files with 6983 additions and 0 deletions

165
LICENSE.txt Normal file
View File

@ -0,0 +1,165 @@
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.

4242
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

20
package.json Normal file
View File

@ -0,0 +1,20 @@
{
"name": "nucleus2",
"version": "1.0.0",
"description": "Nucleus2 is a key value database with support for asynchronous multi-master replication",
"main": "src/writerecord.js",
"type": "module",
"scripts": {
"test": "node --experimental-vm-modules node_modules/jest/bin/jest.js --verbose"
},
"repository": {
"type": "git",
"url": "https://src.enesda.com/labs/nucleus2"
},
"author": "Tracy Rust",
"license": "LGPL-3.0-only",
"devDependencies": {
"eslint": "^8.41.0",
"jest": "^29.5.0"
}
}

7
src/nucleus2.d.ts vendored Normal file
View File

@ -0,0 +1,7 @@
export class Nucleus2 {
constructor(nodeId: string, logReader: () => Array<string>, cacheReader: () => string, cacheWriter: (string) => void) {
}
}

565
src/nucleus2.js Normal file
View File

@ -0,0 +1,565 @@
import {
assertGoodWriteRecord,
constructActiveProperty,
createRecord,
digestRecordIntoProperty, readPropertyString,
} from "./writerecord.js";
function constructCache() {
const obj = {};
obj.tables = {};
obj.indexes = {}; //Because it's the plural of an index as in a list, not as in indices
obj.lastReadByMachine = {}; //machineId -> last read value
return JSON.stringify(obj);
}
const keyDelimiter = ":";
//IMPRTANTNLY: this can be called on the result and it will be the same,
// this allows us to use the composite key that is made internally,
// with functions exposed to the external api
function makeCompositeKey(...keys) {
let result = "";
for(const k of keys) {
if(k.includes(keyDelimiter)) {
throw new Error(`Nucleus2: Invalid key, contains reserved character: ${keyDelimiter}`);
}
if(result === "")
result = k;
else
result = `${result}${keyDelimiter}${k}`
}
return result;
}
//key1 key2 key3
//up to: key3 is a value key
//up to: key2 is an index key
//up to: key1 is not much (majors?)
//example 2:
//key1 key2 key3 key4
//up to: key4 is a value key
//up to: key3 is an index key
//up to: key2 is not much (majors?)
//example 3:
//key1 key2
//up to: key2 is a value key
//up to: key1 is an index key
//key1
//up to: key1 is a value key
//there is no index key
function fractureKey(obsKey) {
//I kind of hate this, but what would a clean alternative even be?
// either this or passing around all the component keys and compositing
// them all the time which would be even more often than this...
const splitKey = obsKey.split(keyDelimiter);
if(splitKey.length === 0)
throw new Error("Nucleus2: error, composite key too short to split");
return splitKey;
}
//The last key on it's own is called the "minor" key
function popKey(obsKey) {
const splitKey = fractureKey(obsKey);
const minorKey = splitKey.pop(); //come on errybody let's do the split key pop
//makeComposite just returns an empty key with an empty input
return [makeCompositeKey(...splitKey), minorKey];
}
class Nucleus2 {
#nodeId = "";
#cacheable = {}; // all the data we need to dump to disk and read back in to restore committed db state
#cacheFlushLock = false;
#syncLock = false;
#queued = {};
#flushAttemptedDuringFlush = false;
#commitLock = false;
#pendingCommit = {};
#propertyObservers = {}; // observerKey -> {index: callback}
#indexObservers = {};
#majorObservers = {};
#lastObserverId = 0;
#outdatedPropertyObservers = {};
#outdatedIndexObservers = {};
//Is it this simply simple?
#logReader;
#logWriter;
//#cacheReader; //why would we need to read our own cache more than once?
#cacheWriter;
//Logreader is expected to read all logs of each node, returned in a list as such:
//{machine: {logs: [...], readTo: number}, ...}
constructor(nodeId, logReader, logWriter, cacheReader, cacheWriter) {
this.#nodeId = nodeId;
this.#logReader = logReader;
this.#logWriter = logWriter;
this.#cacheWriter = cacheWriter;
//Initialize cache, expected to return a newly constructed cache object
// if nothing is saved already.
this.#cacheable = JSON.parse(cacheReader());
}
#valueChanged(obsKey) {
const val = this.#getPropertyString(obsKey);
const [indexKey, minorKey] = popKey(obsKey);
let indexChanged = false;
if(val !== undefined) {
if(!(indexKey in this.#cacheable.indexes)) {
this.#cacheable.indexes[indexKey] = {};
}
this.#cacheable.indexes[indexKey][minorKey] = null;
indexChanged = true;
} else if(indexKey in this.#cacheable.indexes) { //val is undefined and the index exists
//remove it
if(minorKey in this.#cacheable.indexes[indexKey]) {
delete this.#cacheable.indexes[indexKey][minorKey];
indexChanged = true;
}
//Delete index too, if it's now empty
if(Object.keys(this.#cacheable.indexes[indexKey]).length === 0) {
delete this.#cacheable.indexes[indexKey];
//indexChanged should already be set here
}
}
this.#outdatedPropertyObservers[obsKey] = null;
if(indexChanged) {
this.#outdatedIndexObservers[indexKey] = null;
}
}
#alertDeferredObservers() {
for (const obsKey in this.#outdatedPropertyObservers) {
const val = this.#getPropertyString(obsKey);
let valToPropagate = undefined;
if (val !== undefined)
valToPropagate = JSON.parse(val);
for (const observerId in this.#propertyObservers[obsKey]) {
this.#propertyObservers[obsKey][observerId](valToPropagate, () => this.removePropertyObserver(...fractureKey(obsKey), observerId));
}
const observedPath = fractureKey(obsKey);
let parentKey = obsKey;
while(true) {
[parentKey, ] = popKey(parentKey);
//console.log("parent key: " + parentKey);
if(!parentKey) break;
if (!(parentKey in this.#majorObservers)) {
continue;
}
for(const observerId in this.#majorObservers[parentKey]) {
this.#majorObservers[parentKey][observerId](observedPath, valToPropagate, () => this.removeMajorObserver(...fractureKey(parentKey), observerId));
}
}
}
for (const indexKey in this.#outdatedIndexObservers) {
for(const observerId in this.#indexObservers[indexKey]) {
this.#indexObservers[indexKey][observerId](() => this.removeIndexObserver(...fractureKey(indexKey), observerId));
}
}
this.#outdatedPropertyObservers = {};
this.#outdatedIndexObservers = {};
}
async flushCache() {
if(this.#cacheFlushLock) {
throw new Error("Nucleus2: Can't flush cache with flush in progress");
}
this.#cacheFlushLock = true;
await this.#cacheWriter(JSON.stringify(this.#cacheable));
this.#cacheFlushLock = false;
}
#getPropertyString(obsKey) {
if(obsKey in this.#queued) {
const rec = this.#queued[obsKey];
if(rec.recordType === "delete") {
return undefined;
}
return rec.value;
}
if(obsKey in this.#pendingCommit) {
const rec = this.#pendingCommit[obsKey];
if(rec.recordType === "delete") {
return undefined;
}
return rec.value;
}
if(!(obsKey in this.#cacheable.tables))
return undefined;
//Read property value/string also returns undefined if the value is deleted :3
return readPropertyString(this.#cacheable.tables[obsKey]);
}
get(...args) {
if(args.length === 0)
throw new Error("Nucleus2: error must provide arguments to get");
const val = this.#getPropertyString(makeCompositeKey(...args));
if(val === undefined)
return undefined;
return JSON.parse(val);
}
checkCommitLock() {
return this.#commitLock;
}
checkSyncLock() {
return this.#syncLock;
}
set(...args) {
if(args.length === 0)
throw new Error("Nucleus2: Missing arguments to set");
if(args.length === 1)
throw new Error("Nucleus2: Missing value/key in set call");
const value = args.pop();
if(value === undefined) {
throw new Error("Nucleus2: cannot set value to undefined, use null or delete it");
}
const obsKey = makeCompositeKey(...args);
if(this.#getPropertyString(obsKey) === JSON.stringify(value)) //noop
return;
if(!(obsKey in this.#cacheable.tables)) {
this.#cacheable.tables[obsKey] = constructActiveProperty(this.#nodeId, obsKey);
}
const prop = this.#cacheable.tables[obsKey];
this.#queued[obsKey] = createRecord(prop, "write", value);
this.#valueChanged(obsKey);
this.#alertDeferredObservers();
}
del(...args) {
if(args.length === 0)
throw new Error("Nucleus2: error, must pass args to delete");
const obsKey = makeCompositeKey(...args);
if(this.#getPropertyString(obsKey) === undefined) //doesn't exist or was already deleted
return;
if(!(obsKey in this.#cacheable.tables)) //definitely doesn't exist
return;
const prop = this.#cacheable.tables[obsKey];
//It has to be made against the property and not the record in queue.
// and we don't want to digest that one until it's committed, so we just accept clashing
// machine indices for overwrites (they'll resolve safely with timestamps, since it's
// the same process)
this.#queued[obsKey] = createRecord(prop, "delete");
this.#valueChanged(obsKey);
this.#alertDeferredObservers();
}
async sync() {
//If we're committing, it's fine, order of digestRecordInto property is completely arbitrary :)
if(this.#syncLock) {
throw new Error("Nucleus2: Error, cannot sync with sync in progress!");
}
this.#syncLock = true;
const alertables = {};
let failedToFullyRead = false;
try {
//Do the things
//Logreader is expected to read all logs of each node, returned in a list as such:
//{machine: {logs: [...], readTo: number}, ...}
const logs = await this.#logReader(this.#cacheable.lastReadByMachine);
for(const machineId in logs) {
const logBlock = logs[machineId];
if(!("logs" in logBlock)) {
console.log("Nucleus2: log block missing logs field");
continue;
}
if(!("readTo" in logBlock)) {
console.log("Nucleus2: log block missing readTo field");
continue;
}
for(const record of logBlock.logs) {
assertGoodWriteRecord(record);
const obsKey = record.key;
//Store the current value the first time we encounter it
if(!(obsKey in alertables)) {
alertables[obsKey] = this.#getPropertyString(obsKey);
}
if(!(obsKey in this.#cacheable.tables)) {
this.#cacheable.tables[obsKey] = constructActiveProperty(
this.#nodeId, obsKey);
}
digestRecordIntoProperty(this.#cacheable.tables[obsKey], record);
}
//Set this after actually reading everything, in case there's an throw above.
// that way we don't get stuck. This is essentially a commit.
this.#cacheable.lastReadByMachine[machineId] = logBlock.readTo;
}
} catch(e) {
failedToFullyRead = true;
}
this.#syncLock = false;
//Not really necessary now that we've batched all alerts :3c
for(const obsKey in alertables) {
if(alertables[obsKey] !== this.#getPropertyString(obsKey)) {
this.#valueChanged(obsKey);
}
}
this.#alertDeferredObservers();
if(failedToFullyRead)
throw new Error("Nucleus2: Failed to fully read logs, might have read some or none");
}
async flushToPersistent() {
if(this.#commitLock) {
this.#flushAttemptedDuringFlush = true;
return;
//throw new Error("Nucleus2: Error, cannot flush with flush in progress!");
}
while(true) {
const alertables = {};
let failedToWrite = false;
this.#commitLock = true;
this.#pendingCommit = this.#queued;
this.#queued = {};
const batchedRecords = [];
for (const obsKey in this.#pendingCommit) {
const record = this.#pendingCommit[obsKey];
batchedRecords.push(JSON.stringify(record));
}
try {
//send it
await this.#logWriter(batchedRecords);
for (const obsKey in this.#pendingCommit) {
const record = this.#pendingCommit[obsKey];
if (!(obsKey in this.#cacheable.tables)) {
console.log("Nucleus2: warning, table not found.");
//I mean, you have to have a property to create a record, so
// this is like... a big deal.
continue;
}
const committedProp = this.#cacheable.tables[obsKey];
//We only have to reconcile removal of minor keys from the lists
// if it's committed, the listMinors function looks into the
// queued and pending to prune the result, if we just remove a
// failed record from there, then it's fine.
digestRecordIntoProperty(committedProp, record);
//assume observers already up to date, because optimistic writes
}
} catch (e) {
failedToWrite = true;
for (const obsKey in this.#pendingCommit) {
const record = this.#pendingCommit[obsKey];
if (obsKey in this.#queued) //already obsolete so just roll with it
continue;
let committedVal;
if (!(obsKey in this.#cacheable.tables)) {
console.log("Nucleus2: warning: table not found this should never happen");
//Continue to alert anyway
committedVal = undefined;
} else {
committedVal = readPropertyString(this.#cacheable.tables[obsKey]);
}
if (committedVal !== record.value)
alertables[obsKey] = true;
}
}
this.#pendingCommit = {}; //wait until after the record is digested to reset pending
//AFTER pending is cleared
for (const obsKey in alertables) {
this.#valueChanged(obsKey);
}
this.#alertDeferredObservers();
this.#commitLock = false;
if (failedToWrite)
throw new Error("Nucleus2: failed to write to persistent, db is in recovered state");
if(this.#flushAttemptedDuringFlush) {
this.#flushAttemptedDuringFlush = false;
continue;
} else {
break;
}
}
}
//E.g. table: tagLinks, major: tagId, minor: objId
//list("tagLinks", "$tagId$") - > list of objects with tag
listMinors(...args) {
if(args.length === 0)
throw new Error("Nucleus2: error, must pass a key to list minors");
const minorsKey = makeCompositeKey(...args);
if(!(minorsKey in this.#cacheable.indexes))
return [];
return Object.keys(this.#cacheable.indexes[minorsKey]);
}
#observerRegistrationHelper(dest, ...args) {
if(args < 2) {
throw new Error("Nucleus2: error, invalid number of arguments to register observer");
}
const observer = args.pop();
const key = makeCompositeKey(...args);
const observerId = ++this.#lastObserverId;
if(!(key in dest)) {
dest[key] = {};
}
dest[key][observerId] = observer;
return observerId;
}
#observerRemovalHelper(location, ...args) {
if(args < 2) {
throw new Error("Nucleus2: error, invalid number of arguments to remove property observer");
}
const id = args.pop()
const key = makeCompositeKey(...args);
if(!(key in location))
return;
if(!(id in location[key]))
return;
delete location[key][id];
if(location[key].length === 0) {
//Delete the whole observer list if empty
delete location[key];
}
}
//keys..., observer
registerPropertyObserver(...args) {
return this.#observerRegistrationHelper(this.#propertyObservers, ...args);
}
removePropertyObserver(...args) {
this.#observerRemovalHelper(this.#propertyObservers, ...args);
}
registerIndexObserver(...args) {
return this.#observerRegistrationHelper(this.#indexObservers, ...args);
}
removeIndexObserver(...args) {
this.#observerRemovalHelper(this.#indexObservers, ...args);
}
registerMajorObserver(...args) {
return this.#observerRegistrationHelper(this.#majorObservers, ...args);
}
removeMajorObserver(...args) {
this.#observerRemovalHelper(this.#majorObservers, ...args);
}
}
export default Nucleus2;
export {constructCache}

1018
src/nucleus2.test.js Normal file

File diff suppressed because it is too large Load Diff

395
src/writerecord.js Normal file
View File

@ -0,0 +1,395 @@
/*
class IncomparableRecords extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
}
}
*/
//Used for loading as well as asserting construction was valid
//we are tolerant of extraneous values, assuming that future versions of nucleus
// that was to be backwards compatible might write extra data.
function assertGoodWriteRecord(obj) {
/* this makes testing difficult, and... isn't consistent, anyway
Object.freeze(obj);
if(!Object.isFrozen(obj))
throw new Error("Object must be frozen to be a valid write record");
*/
if(!("recordType" in obj))
throw new Error("Missing record type field");
if(!("timestamp" in obj))
throw new Error("Missing timestamp field");
if(typeof obj.timestamp !== "number")
throw new Error("Wrong type for timestamp");
if(!("key" in obj))
throw new Error("Missing key field");
if(typeof obj.key !== 'string')
throw new Error("Wrong type for key id");
if(!("machineId" in obj))
throw new Error("Missing machine id field");
if(typeof obj.machineId !== 'string')
throw new Error("Wrong type for machine id");
if(!("machineIndex" in obj))
throw new Error("Missing machine index");
if(typeof obj.machineIndex !== 'number')
throw new Error("Wrong type for machine index");
if(!("anchors" in obj))
throw new Error("Missing anchors");
if(typeof obj.anchors === 'object')
{
for(const key in obj.anchors)
{
if(typeof obj.anchors[key] !== 'number')
throw new Error("Wrong type for anchor machine index, expected number");
}
}
else
throw new Error("Wrong type for anchors object");
if(!("value"in obj))
throw new Error("Missing value");
if(typeof obj.value !== 'string')
throw new Error("Wrong type for value");
}
function constructActiveProperty(machineId, key) {
const obj = {};
obj.machineId = machineId;
obj.machineIndex = 0;
obj.key = key;
obj.anchors = {};
obj.topRecordByMachine = {};
assertGoodActiveProperty(obj);
return obj;
}
function assertGoodActiveProperty(property) {
if(!property)
throw new Error("Undefined property"); //lol I mean
if(!("machineId" in property))
throw new Error("Property missing machineId");
if(!("machineIndex" in property))
throw new Error("Property missing machineIndex");
if(!("key" in property))
throw new Error("Property missing key");
if(!("anchors" in property))
throw new Error("Property Missing anchors");
if(!("topRecordByMachine" in property))
throw new Error("Property Missing top record by machine field");
}
function digestRecordIntoProperty(property, record) {
assertGoodActiveProperty(property);
assertGoodWriteRecord(record);
if(record.machineId in property.topRecordByMachine) {
const committed = property.topRecordByMachine[record.machineId];
const superiorAnchors = {...record.anchors};
//merge anchors, regardless of machine index. Any witnessed higher anchor on this machine is valid :)
for(const machineId in committed.anchors) {
if(!(machineId in superiorAnchors)) {
superiorAnchors[machineId] = committed.anchors[machineId];
continue;
}
const comAnchor = committed.anchors[machineId];
const recAnchor = record.anchors[machineId];
if(comAnchor > recAnchor)
superiorAnchors[machineId] = comAnchor;
}
const sofitsticated = (a, b) => {
if(a.machineIndex > b.machineIndex)
return 1;
if(a.machineIndex < b.machineIndex)
return -1;
//Equal machine indices means we have a local conflict, which is resolved by timestamp,
// and then... yeah... main thing is to make sure it's consistent.
if(a.timestamp > b.timestamp)
return 1;
if(a.timestamp < b.timestamp)
return -1;
//Now we've gone and done it.
//The following isn't meant to be pretty, or logical. It's meant to be predictable.
if(a.recordType === "write" && b.recordType === "write") {
//Lexicographically compare the value as a tie-breaker, if it's the same then it
// really doesn't matter now, then does it?
//Notice: this is the stringified object, readPropertyValue is what parses it
return predictableStringComparison(a.value, b.value);
}
if(a.recordType === "delete" && b.recordType === "delete") {
return 0; //it really doesn't matter, but they are equal
}
//only thing left is a write AND a delete, the write takes precedence for arbitrary reasons
if(a.recordType === "write") {
return 1;
}
if(b.recordType !== "write" || a.recordType !== "delete") {
throw new Error("very bad logic error or something");
}
//:)
return -1;
}
const compResult = sofitsticated(record, committed);
//-1 is committed, so no-op, but potentially propogate the anchors
//0 is equal, so no-op for the value (but we want to propagate the anchors)
if(compResult === 1)
{
//Then record is the preferred record.
property.topRecordByMachine[record.machineId] = record;
}
//Merge superior anchors regardless
property.topRecordByMachine[record.machineId].anchors = superiorAnchors;
} else {
//New machine
property.topRecordByMachine[record.machineId] = record;
}
//If we have witnessed a machine.index higher than this one, in the property,
// then it must have been written to property.anchors.machineId.index, and therefore
// we know if this record is new or not
//If it's equal or lower, then it's a local conflict and might have different anchors which
// need to propagate.
//if we made it past that check, then we're authorized to bump anchors, and erase
// any top records that have matching or lower indices in property.anchors
for(const machineId in record.anchors) {
//Because we delete records, and we might reference our old anchor (not supposed to but it's harmless to
// defensive)
if(machineId === record.machineId)
{
console.log("Nucleus2: Warning! record anchored to it's own machine!")
continue;
}
const recordAnchorIndex = record.anchors[machineId];
/*
//We can only obsolete records that are anchored by the record we're digesting, right?
//Delete obsolete records
if(machineId in property.topRecordByMachine) {
if(property.topRecordByMachine[machineId].machineIndex <= recordAnchorIndex) {
delete property.topRecordByMachine[machineId];
}
}
*/
//bump property wide anchors
if(machineId in property.anchors) {
const propertyAnchorIndex = property.anchors[machineId];
if(propertyAnchorIndex >= recordAnchorIndex)
continue;
}
property.anchors[machineId] = recordAnchorIndex;
}
//If it's a local record then we bump the local machine index, which we own.
//No-op if local conflict
if(record.machineId === property.machineId) {
if(property.machineIndex < record.machineIndex) {
property.machineIndex = record.machineIndex;
}
} else {
//We have to set the property level anchor because records are not self-referential,
// but only if it's not the same machineId because we use these anchors to create
// new records and again, they should not be self referential
const prevAnchor = (() => {
if(record.machineId in property.anchors) {
return property.anchors[record.machineId];
}
return 0;
})();
if(prevAnchor < record.machineIndex) {
property.anchors[record.machineId] = record.machineIndex;
}
}
}
function predictableStringComparison(a, b) {
//I don't trust compareLocale
if(a === b) {
return 0;
}
if(a < b) {
return -1;
}
if(a > b) {
return 1;
}
throw new Error("That's a new one for sure");
}
function pickTopPropertyRecord(property) {
assertGoodActiveProperty(property);
const comparator = (a, b) => {
if(a.timestamp < b.timestamp)
return -1;
if(a.timestamp > b.timestamp)
return 1;
//equal timestamps
//Use machine id as tie-breaker
return predictableStringComparison(a.machineId, b.machineId);
}
const conflictingRecrods = (() => {
const result = [];
const candidates = {...property.topRecordByMachine};
//console.log(JSON.stringify(candidates));
for(const key in candidates) {
const prop = candidates[key];
for(const anchoredMachineId in prop.anchors) {
if(!(anchoredMachineId in candidates))
continue;
const candidateMachineIndex = candidates[anchoredMachineId].machineIndex;
const ourAnchor = prop.anchors[anchoredMachineId];
if(candidateMachineIndex <= ourAnchor) {
delete candidates[anchoredMachineId];
}
}
}
//Go over what's left
for(const key in candidates) {
result.push(property.topRecordByMachine[key]);
}
return result;
})();
if(!conflictingRecrods.length)
return null;
conflictingRecrods.sort(comparator);
return conflictingRecrods.at(-1);
}
function readPropertyString(property) {
assertGoodActiveProperty(property);
const winner = pickTopPropertyRecord(property);
if(winner === null)
return undefined; //because the property record defines the value
if(winner.recordType === "write")
return pickTopPropertyRecord(property).value;
else
return undefined;//I mean... it could be just... null for undefiend and for deleted?
}
function readPropertyValue(property) {
const propertyString = readPropertyString(property);
if(propertyString !== undefined)
return JSON.parse(propertyString);
return undefined;
}
function createRecord(property, type, value = null) {
assertGoodActiveProperty(property);
if(type !== "write" && type !== "delete")
throw new Error("Record type must be either write or delete");
const obj = {};
obj["recordType"] = type;
obj["timestamp"] = Date.now();
obj["key"] = property.key;
obj["machineId"] = property.machineId;
//How do we increment? safely in case of failed commits?
obj["machineIndex"] = property.machineIndex + 1;
//Absolutely has to be a deep copy!!!!!!
//It's a smol object anyway.
obj["anchors"] = JSON.parse(JSON.stringify(property.anchors)); // need a list of highest machine indices...
//Shouldn't be necessary but...
delete obj["anchors"][property.machineId];
obj["value"] = JSON.stringify(value);
assertGoodWriteRecord(obj);
return obj;
}
export {createRecord,
readPropertyValue,
readPropertyString,
digestRecordIntoProperty,
constructActiveProperty,
assertGoodWriteRecord,
}

571
src/writerecord.test.js Normal file
View File

@ -0,0 +1,571 @@
import {createRecord, readPropertyValue, digestRecordIntoProperty, constructActiveProperty} from "./writerecord.js";
test('Basic checks, single machine', () => {
const prop = constructActiveProperty("eeee", "eeee", "eeeee", "3");
const rec = createRecord(prop, "write", 42);
digestRecordIntoProperty(prop, rec);
expect(readPropertyValue(prop)).toBe(42);
//new record has to be created in light of the previous digested record. We can't just
// create a bunch of records before digestion, because they have indices maintained by
// the property object
const rec2 = createRecord(prop, "write", 69);
digestRecordIntoProperty(prop, rec2);
expect(readPropertyValue(prop)).toBe(69);
});
test("reading property of unset record is undefined", () => {
const prop = constructActiveProperty("ee", "eee", "eeee", "3");
expect(readPropertyValue(prop)).toBe(undefined);
});
test("record can be deleted", () => {
const prop = constructActiveProperty("ei", "ei", "o", "e");
const rec1 = createRecord(prop, "write", 42);
digestRecordIntoProperty(prop, rec1);
expect(readPropertyValue(prop)).toBe(42);
const rec2del = createRecord(prop, "delete");
digestRecordIntoProperty(prop, rec2del);
expect(readPropertyValue(prop)).toBe(undefined);
});
test("Record digestion is idempotent", () => {
const prop = constructActiveProperty("eeee", "eeee", "eeeee", "e");
const rec = createRecord(prop, "write", 42);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
digestRecordIntoProperty(prop, rec);
expect(readPropertyValue(prop)).toBe(42);
//new record has to be created in light of the previous digested record. We can't just
// create a bunch of records before digestion, because they have indices maintained by
// the property object
const rec2 = createRecord(prop, "write", 69);
digestRecordIntoProperty(prop, rec2);
expect(readPropertyValue(prop)).toBe(69);
});
describe("conflicting machine index resolves gracefully", () => {
let prop;
let rec1;
let rec2;
beforeEach(() => {
prop = constructActiveProperty("ee", "eee", "eeee", "3");
//Conflicting machine ids because they're both based on the same current value
rec1 = createRecord(prop, "write", 42);
rec2 = createRecord(prop, "write", 69); //later timestamp
});
test('record one then two', () => {
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec1);
});
//Set it up to fail by passing the newer one first, proving it's not just picking
// based on order.
test('record two, then one', () => {
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec1);
});
afterEach(() => {
expect(readPropertyValue(prop)).toBe(69);
})
});
describe("conflicting timestamp and machine id resolution", () => {
let prop;
let rec1;
let rec2;
beforeEach(() => {
prop = constructActiveProperty("ee", "eye", "ee", "eye");
});
test("write takes precedence over delete, all things being equal", () => {
rec1 = createRecord(prop, "delete", null);
rec2 = createRecord(prop, "write", 69);
});
test("two conflicting writes, lexicographically superior value takes precedence", () => {
rec1 = createRecord(prop, "write", 69);
rec2 = createRecord(prop, "write", 42);
});
//We don't test the same value or two simultaneous deletes... for hopefully obvious reasons :)
afterEach(() => {
rec1.timestamp = 0;
rec2.timestamp = 0;
//Quick deep copy yeah yeah yeah I know I just need to make 1000% sure it's correct and it's
// a small enough object for a test
const propAlt = JSON.parse(JSON.stringify(prop));
//Order straight
digestRecordIntoProperty(prop, rec1);
digestRecordIntoProperty(prop, rec2);
expect(readPropertyValue(prop)).toBe(69);
//REORDER to make sure it's not order dependent
digestRecordIntoProperty(propAlt, rec2);
digestRecordIntoProperty(propAlt, rec1); //!!
expect(readPropertyValue(propAlt)).toBe(69);
});
});
describe("Diverging anchors from conflicting indices are properly bumped", () => {
let prop1;
let prop1alt;
let prop2;
let rec1;
let rec2;
let rec3;
const firstValue = "far off future soft lock value";
const altValue = "value which should've been superseded by prop1";
const finalValue = ":3c";
//This allows us bum the anchor off of an adjacent process. Kind of an edge case but
// the right thing to do is assume we, the node, are aware of the value of the other node
// meaning *drum roll* that it's in the past and that this one should absolutely supersede it.
beforeEach(() => {
prop1 = constructActiveProperty("machine1", "p", "p", "e");
prop1alt = constructActiveProperty("machine1", "p", "p", "e");
prop2 = constructActiveProperty("machine2", "p", "p", "di");
//okay so, we want a write to prop2, then sync prop1alt on it, then do conflicting writes to
// prop1 and prop1alt. prop1 shouldn't know about the record written to prop2, but prop1alt should.
//which means that it should not conflict.
//or something
//I'm tired.
//The catch is that the record from prop2 is set far off into the future lands.
// meaning, if we're in conflict, it's preferred. But if we have a conflict aware anchor,
// then we can supersede it. If the correct value is from prop1, because the timestamp is
// ahead of prop1alt, but prop1alt has the anchor for prop2 to supersede it's bad timestamp...
rec1 = createRecord(prop2, "write", firstValue);
rec1.timestamp = 69; //yeet into the future
//Make prop1alt aware of the write that happened to prop2
digestRecordIntoProperty(prop1alt, rec1);
rec2 = createRecord(prop1alt, "write", altValue);
rec2.timestamp = 0;
rec3 = createRecord(prop1, "write", finalValue);
rec3.timestamp = 42; //ahead of record 2, but behind record 1.
});
//Only one that has digested in the setup is rec1 into prop1alt
test("Order 1 (1, 2, 3)", () => {
//console.log("Rec1: " + JSON.stringify(rec1));
//console.log("Rec2: " + JSON.stringify(rec2));
//console.log("Rec3: " + JSON.stringify(rec3));
//or is it because I broke the algorithm...
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop1, rec3);
digestRecordIntoProperty(prop1alt, rec1);
digestRecordIntoProperty(prop1alt, rec2);
digestRecordIntoProperty(prop1alt, rec3);
digestRecordIntoProperty(prop2, rec1);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec3);
});
test("Order 2 (3, 2, 1)", () => {
digestRecordIntoProperty(prop1, rec3);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1alt, rec3);
digestRecordIntoProperty(prop1alt, rec2);
digestRecordIntoProperty(prop1alt, rec1);
digestRecordIntoProperty(prop2, rec3);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec1);
});
test("Order 3 (2, 3, 1)", () => {
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop1, rec3);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1alt, rec2);
digestRecordIntoProperty(prop1alt, rec3);
digestRecordIntoProperty(prop1alt, rec1);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec3);
digestRecordIntoProperty(prop2, rec1);
});
//maybe overkill but.... then again...
test("Order 4 (1, 3, 2)", () => {
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1, rec3);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop1alt, rec1);
digestRecordIntoProperty(prop1alt, rec3);
digestRecordIntoProperty(prop1alt, rec2);
digestRecordIntoProperty(prop2, rec1);
digestRecordIntoProperty(prop2, rec3);
digestRecordIntoProperty(prop2, rec2);
});
afterEach(() => {
expect(readPropertyValue(prop1)).toBe(finalValue);
expect(readPropertyValue(prop1alt)).toBe(finalValue);
expect(readPropertyValue(prop2)).toBe(finalValue);
});
});
describe("Diverging anchors from conflicting machine Ids with non-conflicting indices are properly bumped", () => {
const prop = constructActiveProperty("machine3", "p", "p", "p");
let rec1;
let rec2;
let rec3;
let rec4;
const firstValue = "far off future soft lock value";
const altValue = "value which should've been superseded by prop1";
const altValue2 = "rerererere";
const finalValue = ":3c";
//similar to the other test, but we want to make sure that prop1 is ahead of alt1, meaning
// their indices don't conflict and it *should* be an obvious win, but prop1alt has
// newer anchors, so we want to bump those
beforeEach(() => {
const prop1 = constructActiveProperty("machine1", "p", "p", "e");
const prop1alt = constructActiveProperty("machine1", "p", "p", "e");
const prop2 = constructActiveProperty("machine2", "p", "p", "di");
rec1 = createRecord(prop2, "write", firstValue);
rec1.timestamp = 69; //yeet into the future
//Make prop1alt aware of the write that happened to prop2
digestRecordIntoProperty(prop1alt, rec1);
rec2 = createRecord(prop1alt, "write", altValue);
rec2.timestamp = 0;
rec3 = createRecord(prop1, "write", altValue2);
//timestamp shouldn't enter into it because we have indices on our side
digestRecordIntoProperty(prop1, rec3);
rec4 = createRecord(prop1, "write", finalValue);
rec4.timestamp = 42; //ahead of record 2, but behind record 1.
});
test("Order 1 (1, 2, 3, 4)", () => {
digestRecordIntoProperty(prop, rec1);
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec3);
digestRecordIntoProperty(prop, rec4);
});
test("Order 2 (4, 3, 2, 1)", () => {
digestRecordIntoProperty(prop, rec4);
digestRecordIntoProperty(prop, rec3);
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec1);
});
test("Order 3 (3, 4, 2, 1)", () => {
digestRecordIntoProperty(prop, rec3);
digestRecordIntoProperty(prop, rec4);
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec1);
});
test("Order 4 (3, 4, 1, 2)", () => {
digestRecordIntoProperty(prop, rec3);
digestRecordIntoProperty(prop, rec4);
digestRecordIntoProperty(prop, rec1);
digestRecordIntoProperty(prop, rec2);
});
//let's not test all the permutations this time... that should be good.
afterEach(() => {
expect(readPropertyValue(prop)).toBe(finalValue);
});
});
test("Old records do nothing when re-applied", () => {
const prop = constructActiveProperty("eeee", "eeee", "eeeee", "e");
const rec = createRecord(prop, "write", 42);
digestRecordIntoProperty(prop, rec);
expect(readPropertyValue(prop)).toBe(42);
const rec2 = createRecord(prop, "write", 69);
digestRecordIntoProperty(prop, rec2);
digestRecordIntoProperty(prop, rec);
expect(readPropertyValue(prop)).toBe(69);
});
test('conflict resolution two machines, parallel writes', () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "e");
const prop2 = constructActiveProperty("machine2", "p", "p", "di");
//Parallel, createRecord creates a timestamp, so the order we call it in is crucial for the test
const rec1 = createRecord(prop1, "write", 69);
const rec2 = createRecord(prop2, "write", 42);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec1);
//should both be 42 :3
expect(readPropertyValue(prop1)).toBe(42);
expect(readPropertyValue(prop2)).toBe(42);
});
test("record isn't yeeted into the past", () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "d");
const prop2 = constructActiveProperty("machine2", "p", "p", "d");
const rec1 = createRecord(prop1, "write", 1);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop2, rec1);
const pastRec = createRecord(prop1, "write", 42);
pastRec.timestamp = 0; //jan first 1970, bb
digestRecordIntoProperty(prop1, pastRec);
digestRecordIntoProperty(prop2, pastRec);
expect(readPropertyValue(prop1)).toBe(42);
expect(readPropertyValue(prop2)).toBe(42);
});
test("future write record can be superseded and doesn't soft lock property", () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "d");
const prop2 = constructActiveProperty("machine2", "p", "p", "d");
const rec1 = createRecord(prop1, "write", 1);
//~24 hours into the future, we assume it won't take that long for the rest of the test to execute
rec1.timestamp += 1000*60*60*24;
//Digest property otherwise we can't create a record that can shadow it (because the property
// has to record the index)
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop2, rec1);
const rec2 = createRecord(prop1, "write", 42);
//rec2 has a *older* timestamp, and if we only relied on timestamps,
// the property would be soft-locked for 24 hours
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop2, rec2);
expect(readPropertyValue(prop1)).toBe(42);
expect(readPropertyValue(prop2)).toBe(42);
});
//This is such an important test
//This one is different from the "yeeted into the past" test because that one is
// testing update without conflict, this one is conflicted.
test("conflict aware record supersedes timestamp resolution, regardless of timestamp", () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "d");
const prop2 = constructActiveProperty("machine2", "p", "p", "d");
//Parallel, createRecord creates a timestamp, so the order we call it in is crucial for the test
const rec1 = createRecord(prop1, "write", 69);
const rec2 = createRecord(prop2, "write", 42);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec1);
//For completeness... this would be tested by creating the record on both props
//but
//yeah
const recFinal = createRecord(prop1, "write", 420);
//Now it's a past record. With the above being conflict resolution based on timestamp,
// this proves the system is capable of overcoming timestamp resolution.
recFinal.timestamp = 0;
digestRecordIntoProperty(prop1, recFinal);
digestRecordIntoProperty(prop2, recFinal);
expect(readPropertyValue(prop1)).toBe(420);
expect(readPropertyValue(prop2)).toBe(420);
});
test("estranged node returns", () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "d");
const prop2 = constructActiveProperty("machine2", "p", "p", "d");
const prop3 = constructActiveProperty("machine3", "p", "p", "d");
//What are we testing here?
//I wanna test an old node that has uncommitted writes, that don't reference the newer writes, but has
// a newer timestamp. Due to the other tests I think we can be fairly confident that
// the synced version is consistent against all nodes that have read all the write records, so
// they should supersede old uncommitted writes with ancient timestamps and indices.
//this is really just a quadruple check as it is.
const rec1 = createRecord(prop1, "write", 1);
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop2, rec1);
digestRecordIntoProperty(prop3, rec1);
const rec2 = createRecord(prop2, "write", 2);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop2, rec2);
const rec3 = createRecord(prop1, "write", 3);
digestRecordIntoProperty(prop1, rec3);
digestRecordIntoProperty(prop2, rec3);
//Without bringing node 3 into parity first
//This will create a conflict, but it should resolve in favor of record 4
// due to timestamps. I don't think it's that interesting to test for it to
// be overwritten if the timestamp is earlier, because that's what other tests
// are for.
const rec4 = createRecord(prop3, "write", 42);
digestRecordIntoProperty(prop1, rec4);
digestRecordIntoProperty(prop2, rec4);
digestRecordIntoProperty(prop3, rec4);
expect(readPropertyValue(prop1)).toBe(42);
expect(readPropertyValue(prop2)).toBe(42);
expect(readPropertyValue(prop3)).toBe(42);
});
test("conflicting timestamps resolve consistently", () => {
const prop1 = constructActiveProperty("machine1", "p", "p", "d");
const prop2 = constructActiveProperty("machine2", "p", "p", "d");
//Parallel, createRecord creates a timestamp, so the order we call it in is crucial for the test
const rec1 = createRecord(prop1, "write", 69);
const rec2 = createRecord(prop2, "write", 42);
//same stamp timestamps
rec1.timestamp = 1;
rec2.timestamp = 1;
//Applied in opposite orders
digestRecordIntoProperty(prop1, rec1);
digestRecordIntoProperty(prop1, rec2);
digestRecordIntoProperty(prop2, rec2);
digestRecordIntoProperty(prop2, rec1);
//Machine1 has a lower id lexicographically, so it's preferred.
expect(readPropertyValue(prop1)).toBe(42);
expect(readPropertyValue(prop2)).toBe(42);
});
describe("Arbitrary json objects work right", () => {
let anArbitraryJsonObject;
let copiedOut;
beforeEach(() => {
anArbitraryJsonObject = {
whee: {
key: "three",
blee: "blee!",
},
sixtyNine: 69,
}
const prop = constructActiveProperty("machine1", "p", "p", "d");
const rec = createRecord(prop, "write", anArbitraryJsonObject);
digestRecordIntoProperty(prop, rec);
copiedOut = readPropertyValue(prop);
});
test("Returned object is correct", () => {
expect("whee" in copiedOut).toBeTruthy();
expect(copiedOut.sixtyNine).toBe(69);
expect(copiedOut.whee.key).toBe("three");
expect(copiedOut.whee.blee).toBe("blee!");
});
test("Returned object is not the same copy as passed to createRecord", () => {
anArbitraryJsonObject.sixtyNine = 42;
expect(copiedOut.sixtyNine).toBe(69);
});
});
/* todo
test("One machine can skip write records and still work the same in the end", () => {
});
test("Record digestion is order independent", () => {
//criss cross records from two other nodes here
});
*/