Skip to content

Commit

Permalink
feat: cf sync pipeline (nftstorage#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
hugomrdias authored Jul 7, 2021
1 parent 9df8dce commit 50a3b3f
Show file tree
Hide file tree
Showing 10 changed files with 902 additions and 4 deletions.
68 changes: 68 additions & 0 deletions packages/tools/cf-sync/cf.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const { URL } = require('url')
const path = require('path')
const got = require('got').default

require('dotenv').config({
path: path.join(__dirname, '../.env.local'),
})
const endpoint = new URL('https://api.cloudflare.com/client/v4/')
const CF_ACCOUNT_ID = process.env.CF_ACCOUNT_ID
const CF_TOKEN = process.env.CF_TOKEN

/**
* @param {string} kv
*/
function paginate(kv) {
return got.paginate(
new URL(
`accounts/${CF_ACCOUNT_ID}/storage/kv/namespaces/${kv}/keys`,
endpoint
),
{
headers: { Authorization: `bearer ${CF_TOKEN}` },
pagination: {
transform: (rsp) => {
// @ts-ignore
const data = JSON.parse(rsp.body)
return data.result
},
paginate: (rsp) => {
// @ts-ignore
const { count, cursor } = JSON.parse(rsp.body).result_info
if (!cursor) {
return false
}

return {
searchParams: {
cursor,
},
}
},
},
}
)
}

/**
* @param {any} kv
* @param {any} key
*/
async function get(kv, key) {
return await got
.get(
new URL(
`accounts/${CF_ACCOUNT_ID}/storage/kv/namespaces/${kv}/values/${key}`,
endpoint
),
{
headers: { Authorization: `bearer ${CF_TOKEN}` },
}
)
.json()
}

module.exports = {
paginate,
get,
}
74 changes: 74 additions & 0 deletions packages/tools/cf-sync/cli.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const path = require('path')
require('dotenv').config({
path: path.resolve(__dirname, '.env.local'),
})
const Listr = require('listr')
const Store = require('./store')
const Cluster = require('./cluster')
const { paginate } = require('./cf')
const { syncNFTsData, syncCheck, validateLocal } = require('./utils')

const tasks = new Listr(
[
{
title: 'Sync KV metadata',
task: async (/** @type {Context} */ ctx, task) => {
let count = 0
let missing = 0
const metadata = paginate(ctx.kvNFT)

for await (const item of metadata) {
if (!(await store.has(item.name))) {
await store.put(item.name, item)
missing++
}
count++
task.output = `Total: ${count} New: ${missing}`
}

task.title += ` -> Total: ${count} New: ${missing}`
},
},
{
title: 'Sync KV data',
task: syncNFTsData,
},
{
title: 'Sync deals and pin',
task: syncCheck,
},
{
title: 'Validate local data',
task: validateLocal,
},
],
{
renderer: 'default',
}
)

const store = new Store(path.join(__dirname, 'nft-meta'))
const cluster = new Cluster()

tasks
.run({
nftStorageToken: process.env.NFT_STORAGE_TOKEN,
kvNFT: '9610798d5e5845fa94e4392cc1288ddf',
store,
cluster,
})
.then(() => {
process.exit(0)
})
.catch((err) => {
console.error(err)
})

/**
* @typedef {Object} Context
* @prop {string} kvNFT
* @prop {Store} store
* @prop {Cluster} cluster
* @prop {string} [nftStorageToken]
*
*/
66 changes: 66 additions & 0 deletions packages/tools/cf-sync/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
const path = require('path')
const { URL } = require('url')
require('dotenv').config({
path: path.resolve(__dirname, '.env.local'),
})
const got = require('got').default
const defaults = {
url: 'https://nft.storage.ipfscluster.io/api/',
headers: { Authorization: `Basic ${process.env.CLUSTER_TOKEN}` },
}

class Cluster {
// @ts-ignore
constructor(options) {
this.options = {
...defaults,
...options,
}
}

/**
* @param {string} cid
*/
async status(cid) {
const url = new URL(`pins/${cid}`, this.options.url)

const data = await got(url, {
headers: this.options.headers,
}).json()

let status = 'failed'

const pinInfos = Object.values(data['peer_map'])
if (pinInfos.some((i) => i.status === 'pinned')) {
status = 'pinned'
} else if (pinInfos.some((i) => i.status === 'pinning')) {
status = 'pinning'
} else if (pinInfos.some((i) => i.status === 'queued')) {
status = 'queued'
}

return {
cid: data.cid['/'],
status,
date: pinInfos[0].timestamp,
}
}

/**
* @param {string} cid
*/
async size(cid) {
const url = new URL(
`dag/stat?arg=${cid}&progress=false`,
'https://nft.storage.ipfscluster.io/api/v0/'
)

const data = await got(url, {
headers: this.options.headers,
}).json()

return data.Size
}
}

module.exports = Cluster
16 changes: 16 additions & 0 deletions packages/tools/cf-sync/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Usage

Create a `.env.local` file with the following

```text
CF_TOKEN=<cloudflare token>
CF_ACCOUNT_ID=<cloudflare account id>
NFT_STORAGE_TOKEN=<nft.storage token>
CLUSTER_TOKEN=<ipfs cluster token>
```

Put the LevelDB folder inside this folder with the name `nft-meta` and run the cli

```bash
node cli.js
```
20 changes: 20 additions & 0 deletions packages/tools/cf-sync/schema.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export type LocalNFT = {
name: string
size: number
pinStatus: 'queued' | 'pinning' | 'pinned' | 'failed'
data: {
cid: string
created: string
type: string
scope: string
files: Array<{ name: string; type?: string }>
pin?: {
name?: string
meta?: Record<string, string>
}
}
deals: Array<{
status: 'failed' | 'published' | 'active' | 'terminated' | 'queued'
lastChanged: 'string'
}>
}
69 changes: 69 additions & 0 deletions packages/tools/cf-sync/store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const level = require('level')
const merge = require('merge-options')

class Store {
/**
* @param {string} name
*/
constructor(name) {
this.store = level(name, { valueEncoding: 'json' })
}

/**
* @param {string} key
*/
async get(key) {
try {
return await this.store.get(key)
} catch (err) {
if (err.notFound) {
return undefined
}
throw err
}
}

/**
* @param {string} key
*/
async has(key) {
return (await this.get(key)) !== undefined
}

/**
* @param {string} key
* @param {string} prop
*/
async hasProp(key, prop) {
const value = await this.get(key)

if (value && value[prop] !== undefined) {
return true
}
return false
}

/**
* @param {string} key
* @param {any} value
*/
async put(key, value) {
const prevValue = await this.get(key)
await this.store.put(key, prevValue ? merge(prevValue, value) : value)
}

/**
*
* @returns {AsyncIterable<{key: string, value: any}>}
*/
iterator() {
// @ts-ignore
return this.store.createReadStream()
}

close() {
return this.store.close()
}
}

module.exports = Store
Loading

0 comments on commit 50a3b3f

Please sign in to comment.