forked from nftstorage/nft.storage
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: cf sync pipeline (nftstorage#201)
- Loading branch information
1 parent
9df8dce
commit 50a3b3f
Showing
10 changed files
with
902 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
const { URL } = require('url') | ||
const path = require('path') | ||
const got = require('got').default | ||
|
||
require('dotenv').config({ | ||
path: path.join(__dirname, '../.env.local'), | ||
}) | ||
const endpoint = new URL('https://api.cloudflare.com/client/v4/') | ||
const CF_ACCOUNT_ID = process.env.CF_ACCOUNT_ID | ||
const CF_TOKEN = process.env.CF_TOKEN | ||
|
||
/** | ||
* @param {string} kv | ||
*/ | ||
function paginate(kv) { | ||
return got.paginate( | ||
new URL( | ||
`accounts/${CF_ACCOUNT_ID}/storage/kv/namespaces/${kv}/keys`, | ||
endpoint | ||
), | ||
{ | ||
headers: { Authorization: `bearer ${CF_TOKEN}` }, | ||
pagination: { | ||
transform: (rsp) => { | ||
// @ts-ignore | ||
const data = JSON.parse(rsp.body) | ||
return data.result | ||
}, | ||
paginate: (rsp) => { | ||
// @ts-ignore | ||
const { count, cursor } = JSON.parse(rsp.body).result_info | ||
if (!cursor) { | ||
return false | ||
} | ||
|
||
return { | ||
searchParams: { | ||
cursor, | ||
}, | ||
} | ||
}, | ||
}, | ||
} | ||
) | ||
} | ||
|
||
/** | ||
* @param {any} kv | ||
* @param {any} key | ||
*/ | ||
async function get(kv, key) { | ||
return await got | ||
.get( | ||
new URL( | ||
`accounts/${CF_ACCOUNT_ID}/storage/kv/namespaces/${kv}/values/${key}`, | ||
endpoint | ||
), | ||
{ | ||
headers: { Authorization: `bearer ${CF_TOKEN}` }, | ||
} | ||
) | ||
.json() | ||
} | ||
|
||
module.exports = { | ||
paginate, | ||
get, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
const path = require('path') | ||
require('dotenv').config({ | ||
path: path.resolve(__dirname, '.env.local'), | ||
}) | ||
const Listr = require('listr') | ||
const Store = require('./store') | ||
const Cluster = require('./cluster') | ||
const { paginate } = require('./cf') | ||
const { syncNFTsData, syncCheck, validateLocal } = require('./utils') | ||
|
||
const tasks = new Listr( | ||
[ | ||
{ | ||
title: 'Sync KV metadata', | ||
task: async (/** @type {Context} */ ctx, task) => { | ||
let count = 0 | ||
let missing = 0 | ||
const metadata = paginate(ctx.kvNFT) | ||
|
||
for await (const item of metadata) { | ||
if (!(await store.has(item.name))) { | ||
await store.put(item.name, item) | ||
missing++ | ||
} | ||
count++ | ||
task.output = `Total: ${count} New: ${missing}` | ||
} | ||
|
||
task.title += ` -> Total: ${count} New: ${missing}` | ||
}, | ||
}, | ||
{ | ||
title: 'Sync KV data', | ||
task: syncNFTsData, | ||
}, | ||
{ | ||
title: 'Sync deals and pin', | ||
task: syncCheck, | ||
}, | ||
{ | ||
title: 'Validate local data', | ||
task: validateLocal, | ||
}, | ||
], | ||
{ | ||
renderer: 'default', | ||
} | ||
) | ||
|
||
const store = new Store(path.join(__dirname, 'nft-meta')) | ||
const cluster = new Cluster() | ||
|
||
tasks | ||
.run({ | ||
nftStorageToken: process.env.NFT_STORAGE_TOKEN, | ||
kvNFT: '9610798d5e5845fa94e4392cc1288ddf', | ||
store, | ||
cluster, | ||
}) | ||
.then(() => { | ||
process.exit(0) | ||
}) | ||
.catch((err) => { | ||
console.error(err) | ||
}) | ||
|
||
/** | ||
* @typedef {Object} Context | ||
* @prop {string} kvNFT | ||
* @prop {Store} store | ||
* @prop {Cluster} cluster | ||
* @prop {string} [nftStorageToken] | ||
* | ||
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
const path = require('path') | ||
const { URL } = require('url') | ||
require('dotenv').config({ | ||
path: path.resolve(__dirname, '.env.local'), | ||
}) | ||
const got = require('got').default | ||
const defaults = { | ||
url: 'https://nft.storage.ipfscluster.io/api/', | ||
headers: { Authorization: `Basic ${process.env.CLUSTER_TOKEN}` }, | ||
} | ||
|
||
class Cluster { | ||
// @ts-ignore | ||
constructor(options) { | ||
this.options = { | ||
...defaults, | ||
...options, | ||
} | ||
} | ||
|
||
/** | ||
* @param {string} cid | ||
*/ | ||
async status(cid) { | ||
const url = new URL(`pins/${cid}`, this.options.url) | ||
|
||
const data = await got(url, { | ||
headers: this.options.headers, | ||
}).json() | ||
|
||
let status = 'failed' | ||
|
||
const pinInfos = Object.values(data['peer_map']) | ||
if (pinInfos.some((i) => i.status === 'pinned')) { | ||
status = 'pinned' | ||
} else if (pinInfos.some((i) => i.status === 'pinning')) { | ||
status = 'pinning' | ||
} else if (pinInfos.some((i) => i.status === 'queued')) { | ||
status = 'queued' | ||
} | ||
|
||
return { | ||
cid: data.cid['/'], | ||
status, | ||
date: pinInfos[0].timestamp, | ||
} | ||
} | ||
|
||
/** | ||
* @param {string} cid | ||
*/ | ||
async size(cid) { | ||
const url = new URL( | ||
`dag/stat?arg=${cid}&progress=false`, | ||
'https://nft.storage.ipfscluster.io/api/v0/' | ||
) | ||
|
||
const data = await got(url, { | ||
headers: this.options.headers, | ||
}).json() | ||
|
||
return data.Size | ||
} | ||
} | ||
|
||
module.exports = Cluster |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Usage | ||
|
||
Create a `.env.local` file with the following | ||
|
||
```text | ||
CF_TOKEN=<cloudflare token> | ||
CF_ACCOUNT_ID=<cloudflare account id> | ||
NFT_STORAGE_TOKEN=<nft.storage token> | ||
CLUSTER_TOKEN=<ipfs cluster token> | ||
``` | ||
|
||
Put the LevelDB folder inside this folder with the name `nft-meta` and run the cli | ||
|
||
```bash | ||
node cli.js | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
export type LocalNFT = { | ||
name: string | ||
size: number | ||
pinStatus: 'queued' | 'pinning' | 'pinned' | 'failed' | ||
data: { | ||
cid: string | ||
created: string | ||
type: string | ||
scope: string | ||
files: Array<{ name: string; type?: string }> | ||
pin?: { | ||
name?: string | ||
meta?: Record<string, string> | ||
} | ||
} | ||
deals: Array<{ | ||
status: 'failed' | 'published' | 'active' | 'terminated' | 'queued' | ||
lastChanged: 'string' | ||
}> | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
const level = require('level') | ||
const merge = require('merge-options') | ||
|
||
class Store { | ||
/** | ||
* @param {string} name | ||
*/ | ||
constructor(name) { | ||
this.store = level(name, { valueEncoding: 'json' }) | ||
} | ||
|
||
/** | ||
* @param {string} key | ||
*/ | ||
async get(key) { | ||
try { | ||
return await this.store.get(key) | ||
} catch (err) { | ||
if (err.notFound) { | ||
return undefined | ||
} | ||
throw err | ||
} | ||
} | ||
|
||
/** | ||
* @param {string} key | ||
*/ | ||
async has(key) { | ||
return (await this.get(key)) !== undefined | ||
} | ||
|
||
/** | ||
* @param {string} key | ||
* @param {string} prop | ||
*/ | ||
async hasProp(key, prop) { | ||
const value = await this.get(key) | ||
|
||
if (value && value[prop] !== undefined) { | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
/** | ||
* @param {string} key | ||
* @param {any} value | ||
*/ | ||
async put(key, value) { | ||
const prevValue = await this.get(key) | ||
await this.store.put(key, prevValue ? merge(prevValue, value) : value) | ||
} | ||
|
||
/** | ||
* | ||
* @returns {AsyncIterable<{key: string, value: any}>} | ||
*/ | ||
iterator() { | ||
// @ts-ignore | ||
return this.store.createReadStream() | ||
} | ||
|
||
close() { | ||
return this.store.close() | ||
} | ||
} | ||
|
||
module.exports = Store |
Oops, something went wrong.