Skip to content

Commit

Permalink
Change how heads are stored, add migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
haadcode committed Nov 29, 2024
1 parent 4fccfda commit ac2b77e
Show file tree
Hide file tree
Showing 34 changed files with 204 additions and 98 deletions.
120 changes: 69 additions & 51 deletions benchmarks/orbitdb-documents.js
Original file line number Diff line number Diff line change
@@ -1,68 +1,86 @@
import { createOrbitDB } from '../src/index.js'
// import { createOrbitDB, MemoryStorage } from '../src/index.js'
import { rimraf as rmrf } from 'rimraf'
import createHelia from '../test/utils/create-helia.js'
// import { MemoryStorage, LevelStorage, LRUStorage } from '../src/storage/index.js'
import { rimraf as rmrf } from 'rimraf'

import { EventEmitter } from 'events'
EventEmitter.defaultMaxListeners = 10000
let db
let interval

// Metrics
let totalQueries = 0
let seconds = 0
let queriesPerSecond = 0
let lastTenSeconds = 0

// Settings
const benchmarkDuration = 20 // seconds

const queryLoop = async () => {
const doc = { _id: 'id-' + totalQueries, content: 'hello ' + totalQueries }
// await db.put(totalQueries.toString(), { referencesCount: 0 })
await db.put(doc)
totalQueries++
lastTenSeconds++
queriesPerSecond++
if (interval) {
setImmediate(queryLoop)
}
}

;(async () => {
console.log('Starting benchmark...')

const entryCount = 1000
console.log('Benchmark duration is ' + benchmarkDuration + ' seconds')

await rmrf('./ipfs')
await rmrf('./orbitdb')

// const identities = await Identities()
// const testIdentity = await identities.createIdentity({ id: 'userA' })

const ipfs = await createHelia()
const orbitdb = await createOrbitDB({ ipfs })

console.log(`Insert ${entryCount} documents`)

// MemoryStorage is the default storage for Log but defining them here
// in case we want to benchmark different storage modules
// const entryStorage = await MemoryStorage()
// const headsStorage = await MemoryStorage()
// const indexStorage = await MemoryStorage()

// const db1 = await orbitdb.open('benchmark-documents', { type: 'documents', referencesCount: 16, entryStorage, headsStorage, indexStorage })

const db1 = await orbitdb.open('benchmark-documents', { type: 'documents' })

const startTime1 = new Date().getTime()

for (let i = 0; i < entryCount; i++) {
const doc = { _id: i.toString(), message: 'hello ' + i }
await db1.put(doc)
}

const endTime1 = new Date().getTime()
const duration1 = endTime1 - startTime1
const operationsPerSecond1 = Math.floor(entryCount / (duration1 / 1000))
const millisecondsPerOp1 = duration1 / entryCount
console.log(`Inserting ${entryCount} documents took ${duration1} ms, ${operationsPerSecond1} ops/s, ${millisecondsPerOp1} ms/op`)

console.log(`Query ${entryCount} documents`)
const startTime2 = new Date().getTime()

const all = []
for await (const { key, value } of db1.iterator()) {
all.unshift({ key, value })
}

const endTime2 = new Date().getTime()
const duration2 = endTime2 - startTime2
const operationsPerSecond2 = Math.floor(entryCount / (duration2 / 1000))
const millisecondsPerOp2 = duration2 / entryCount

console.log(`Querying ${all.length} documents took ${duration2} ms, ${operationsPerSecond2} ops/s, ${millisecondsPerOp2} ms/op`)

await db1.drop()
await db1.close()

await orbitdb.stop()
await ipfs.stop()

await rmrf('./ipfs')
await rmrf('./orbitdb')

process.exit(0)
// Test LRUStorage
// const entryStorage = await LRUStorage()
// const headsStorage = await LRUStorage()
// const indexStorage = await LRUStorage()
// Test LevelStorage
// const entryStorage = await LevelStorage({ path: './logA/entries' })
// const headsStorage = await LevelStorage({ path: './orbitdb/benchmark-documents-2/heads', valueEncoding: 'json' })
// const headsStorage = await LevelStorage({ path: './orbitdb/benchmark-documents-2/heads' })
// const indexStorage = await LevelStorage({ path: './logA/index' })

// db = await orbitdb.open('benchmark-documents-2', { type: 'documents', entryStorage, headsStorage, indexStorage })
db = await orbitdb.open('benchmark-documents-2', { type: 'documents' })

// Output metrics at 1 second interval
interval = setInterval(async () => {
seconds++
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0

if (seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds / 10} q/s in the last 10 seconds`)
if (lastTenSeconds === 0) throw new Error('Problems!')
lastTenSeconds = 0
}

if (seconds >= benchmarkDuration) {
clearInterval(interval)
interval = null
process.nextTick(async () => {
await db.close()
await orbitdb.stop()
await rmrf('./orbitdb')
process.exit(0)
}, 1000)
}
}, 1000)

setImmediate(queryLoop)
})()
52 changes: 35 additions & 17 deletions src/oplog/heads.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,13 @@ import MemoryStorage from '../storage/memory.js'

const DefaultStorage = MemoryStorage

const Heads = async ({ storage, heads }) => {
const Heads = async ({ storage, heads, entryStorage }) => {
storage = storage || await DefaultStorage()

const put = async (heads) => {
heads = findHeads(heads)
for (const head of heads) {
await storage.put(head.hash, head.bytes)
}
}

const set = async (heads) => {
await storage.clear()
await put(heads)
const headHashes = heads.map(e => e.hash)
await storage.put('heads', JSON.stringify(headHashes))
}

const add = async (head) => {
Expand All @@ -30,22 +24,25 @@ const Heads = async ({ storage, heads }) => {
return
}
const newHeads = findHeads([...currentHeads, head])
await set(newHeads)
await put(newHeads)

return newHeads
}

const remove = async (hash) => {
const currentHeads = await all()
const newHeads = currentHeads.filter(e => e.hash !== hash)
await set(newHeads)
await put(newHeads)
}

const iterator = async function * () {
const it = storage.iterator()
for await (const [, bytes] of it) {
const head = await Entry.decode(bytes)
yield head
const headHashes = JSON.parse(await storage.get('heads') || [])
for (const hash of headHashes) {
const entry = await entryStorage.get(hash)
if (entry) {
const head = await Entry.decode(entry)
yield head
}
}
}

Expand All @@ -66,11 +63,32 @@ const Heads = async ({ storage, heads }) => {
}

// Initialize the heads if given as parameter
await put(heads || [])
if (heads) {
await put(heads)
}

// Migrate from 2.4.3 -> 2.5.0
const migrate1 = async () => {
const it_ = storage.iterator()
const values = []
for await (const [hash] of it_) {
if (hash !== 'heads') {
values.push(hash)
}
}
if (values.length > 0) {
console.log('Migrate pre v2.5.0 heads database')
console.log('Heads:', values)
await storage.clear()
await storage.put('heads', JSON.stringify(values))
}
}

await migrate1()

return {
put,
set,
set: put,
add,
remove,
iterator,
Expand Down
8 changes: 4 additions & 4 deletions src/oplog/log.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
// Heads storage
headsStorage = headsStorage || await DefaultStorage()
// Add heads to the state storage, ie. init the log state
const _heads = await Heads({ storage: headsStorage, heads: logHeads })
const _heads = await Heads({ storage: headsStorage, heads: logHeads, entryStorage: _entries })
// Conflict-resolution sorting function
sortFn = NoZeroes(sortFn || LastWriteWins)
// Internal queues for processing appends and joins in their call-order
Expand Down Expand Up @@ -184,10 +184,10 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
}

// The appended entry is now the latest head
await _heads.set([entry])
// Add entry to the entry storage
await _entries.put(entry.hash, entry.bytes)
// The appended entry is now the latest head
await _heads.set([entry])
// Add entry to the entry index
await _index.put(entry.hash, true)
// Return the appended entry
Expand Down Expand Up @@ -315,7 +315,7 @@ const Log = async (identity, { logId, logHeads, access, entryStorage, headsStora
/* 6. Add new entry to entries (for pinning) */
await _entries.put(entry.hash, entry.bytes)

/* 6. Add the new entry to heads (=union with current heads) */
/* 7. Add the new entry to heads (=union with current heads) */
await _heads.add(entry)

return true
Expand Down
13 changes: 11 additions & 2 deletions src/storage/ipfs-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { base58btc } from 'multiformats/bases/base58'
import { TimeoutController } from 'timeout-abort-controller'
import drain from 'it-drain'

const DefaultTimeout = 30000 // 30 seconds
const DefaultTimeout = 1000 // 30 seconds

/**
* Creates an instance of IPFSBlockStorage.
Expand All @@ -28,6 +28,8 @@ const DefaultTimeout = 30000 // 30 seconds
const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {
if (!ipfs) throw new Error('An instance of ipfs is required.')

const signals = new Set()

/**
* Puts data to an IPFS block.
* @function
Expand Down Expand Up @@ -59,7 +61,9 @@ const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {
const get = async (hash) => {
const cid = CID.parse(hash, base58btc)
const { signal } = new TimeoutController(timeout || DefaultTimeout)
signals.add(signal)
const block = await ipfs.blockstore.get(cid, { signal })
signals.delete(signal)
if (block) {
return block
}
Expand All @@ -71,7 +75,12 @@ const IPFSBlockStorage = async ({ ipfs, pin, timeout } = {}) => {

const clear = async () => {}

const close = async () => {}
const close = async () => {
for (const s in signals) {
s.abort()
}
signals.clear()
}

return {
put,
Expand Down
2 changes: 1 addition & 1 deletion src/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
}

if (topic === address) {
queue.add(task)
await queue.add(task)
}
}

Expand Down
8 changes: 3 additions & 5 deletions test/database-replication.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('Database - Replication', function () {
}

beforeEach(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)

await copy(testKeysPath, keysPath)
Expand All @@ -42,23 +42,21 @@ describe('Database - Replication', function () {

afterEach(async () => {
if (db1) {
await db1.drop()
await db1.close()

await rimraf('./orbitdb1')
}
if (db2) {
await db2.drop()
await db2.close()

await rimraf('./orbitdb2')
}

if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}

if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

Expand Down
10 changes: 5 additions & 5 deletions test/database.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ describe('Database', function () {

await db.close()

const headsStorage = await LevelStorage({ path: headsPath })
const headsStorage = await LevelStorage({ path: headsPath, valueEncoding: 'json' })

deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(await headsStorage.get('heads'), [hash])

await headsStorage.close()

Expand All @@ -97,9 +97,9 @@ describe('Database', function () {

await db.close()

const headsStorage = await LevelStorage({ path: headsPath })
const headsStorage = await LevelStorage({ path: headsPath, valueEncoding: 'json' })

deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(await headsStorage.get('heads'), [hash])

await headsStorage.close()

Expand All @@ -113,7 +113,7 @@ describe('Database', function () {
const op = { op: 'PUT', key: 1, value: 'record 1 on db 1' }
const hash = await db.addOperation(op)

deepStrictEqual((await Entry.decode(await headsStorage.get(hash))).payload, op)
deepStrictEqual(await headsStorage.get('heads'), [hash])

await db.close()
})
Expand Down
4 changes: 3 additions & 1 deletion test/databases/replication/documents.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('Documents Database Replication', function () {
}

before(async () => {
[ipfs1, ipfs2] = await Promise.all([createHelia(), createHelia()])
[ipfs1, ipfs2] = await Promise.all([createHelia({ directory: './ipfs1' }), createHelia({ directory: './ipfs2' })])
await connectPeers(ipfs1, ipfs2)

await copy(testKeysPath, keysPath)
Expand All @@ -42,10 +42,12 @@ describe('Documents Database Replication', function () {

after(async () => {
if (ipfs1) {
await ipfs1.blockstore.child.child.child.close()
await ipfs1.stop()
}

if (ipfs2) {
await ipfs2.blockstore.child.child.child.close()
await ipfs2.stop()
}

Expand Down
Loading

0 comments on commit ac2b77e

Please sign in to comment.