Skip to content

Commit

Permalink
feat: cron jobs for metrics and pin status tracking (nftstorage#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw authored Apr 23, 2021
1 parent 29af6af commit c429e41
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 182 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ wrangler kv:namespace create NFTS --preview --env USER
# same as above
wrangler kv:namespace create DEALS --preview --env USER
# same as above
wrangler kv:namespace create METRICS --preview --env USER
# same as above
```

Go to `/site/src/constants.js` _uncomment_ the first line and run `wrangler publish --env USER`.
Expand Down Expand Up @@ -150,6 +152,8 @@ wrangler kv:namespace create NFTS --env production
# Follow the instructions from the cli output
wrangler kv:namespace create DEALS --env production
# Follow the instructions from the cli output
wrangler kv:namespace create METRICS --env production
# Follow the instructions from the cli output
wrangler secret put AUTH0_DOMAIN --env production # Get from auth0 account
wrangler secret put AUTH0_CLIENT_ID --env production # Get from auth0 account
wrangler secret put AUTH0_CLIENT_SECRET --env production # Get from auth0 account
Expand Down
1 change: 1 addition & 0 deletions site/src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ declare global {
const DEALS: KVNamespace
const USERS: KVNamespace
const NFTS: KVNamespace
const METRICS: KVNamespace
const PINATA_JWT: string
const CLUSTER_API_URL: string
const CLUSTER_BASIC_AUTH_TOKEN: string
Expand Down
1 change: 1 addition & 0 deletions site/src/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export async function dagSize(cid) {
cluster.ipfsProxyApiUrl
)
const response = await fetch(url.toString(), {
method: 'POST',
headers: { Authorization: `Basic ${cluster.ipfsProxyBasicAuthToken}` },
})
if (!response.ok) {
Expand Down
1 change: 1 addition & 0 deletions site/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const stores = {
deals: DEALS,
users: USERS,
nfts: NFTS,
metrics: METRICS,
}

export const auth0 = {
Expand Down
26 changes: 24 additions & 2 deletions site/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Router } from './utils/router.js'
import { homepage } from './routes/homepage.js'
import { auth } from './routes/auth.js'
import { logout } from './routes/logout.js'
import { notFound } from './utils/utils.js'
import { notFound, timed } from './utils/utils.js'
import { cors, postCors } from './routes/cors.js'
import { upload } from './routes/nfts-upload.js'
import { status } from './routes/nfts-get.js'
Expand All @@ -22,11 +22,17 @@ import { pinsList } from './routes/pins-list.js'
import { pinsReplace } from './routes/pins-replace.js'
import { pinsDelete } from './routes/pins-delete.js'
import { metrics } from './routes/metrics.js'
import {
updateUserMetrics,
updateNftMetrics,
updateNftDealMetrics,
} from './jobs/metrics.js'
import { updatePinStatuses } from './jobs/pins.js'

const r = new Router({
onError(req, err) {
return HTTPError.respond(err)
}
},
})

// Site
Expand Down Expand Up @@ -60,3 +66,19 @@ r.add('delete', '/api/internal/tokens', tokensDelete)

r.add('all', '*', notFound)
addEventListener('fetch', r.listen.bind(r))

// Cron jobs
addEventListener('scheduled', (event) =>
event.waitUntil(
(async () => {
await timed(updateUserMetrics, 'CRON updateUserMetrics')
await timed(updateNftMetrics, 'CRON updateNftMetrics')
})()
)
)
addEventListener('scheduled', (event) =>
event.waitUntil(timed(updateNftDealMetrics, 'CRON updateNftDealMetrics'))
)
addEventListener('scheduled', (event) =>
event.waitUntil(timed(updatePinStatuses, 'CRON updatePinStatuses'))
)
148 changes: 148 additions & 0 deletions site/src/jobs/metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { stores } from '../constants.js'
import * as metrics from '../models/metrics.js'
import * as deals from '../models/deals.js'

/**
* @typedef {{
* queued: number,
* proposing: number,
* accepted: number,
* failed: number,
* published: number,
* active: number,
* terminated: number
* }} DealsSummary
*/

// TODO: keep running total?
export async function updateUserMetrics() {
let total = 0
let done = false
let cursor
while (!done) {
// @ts-ignore
const users = await stores.users.list({ cursor })
total += users.keys.length
cursor = users.cursor
done = users.list_complete
}
await metrics.set('users:total', total)
}

// TODO: keep running totals?
export async function updateNftMetrics() {
let total = 0
let totalBytes = 0
let totalPins = 0
let done = false
let cursor
while (!done) {
// @ts-ignore
const nftList = await stores.nfts.list({ cursor, limit: 1000 })
total += nftList.keys.length
for (const k of nftList.keys) {
if (!k.metadata) continue
totalBytes += k.metadata.size || 0
if (k.metadata.pinStatus === 'pinned') {
totalPins++
}
}
cursor = nftList.cursor
done = nftList.list_complete
}
await Promise.all([
// Total number of NFTs stored on nft.storage
metrics.set('nfts:total', total),
// Total bytes of all NFTs
metrics.set('nfts:totalBytes', totalBytes),
// Total number of NFTs pinned on IPFS
metrics.set('nfts:pins:total', totalPins),
])
}

// TODO: keep running totals?
export async function updateNftDealMetrics() {
const totals = {
queued: 0,
proposing: 0,
accepted: 0,
failed: 0,
published: 0,
active: 0,
terminated: 0,
unknown: 0,
}
let done = false
let cursor
while (!done) {
// @ts-ignore
const dealList = await stores.deals.list({ cursor, limit: 1000 })
for (const k of dealList.keys) {
/** @type {DealsSummary} */
let summary = k.metadata
// TODO: remove when ALL deals have summary in metadata
if (summary == null) {
const d = await deals.get(k.name)
if (!d.length) continue
summary = getDealsSummary(d)
}
const status = getEffectiveStatus(summary)
totals[status]++
}
cursor = dealList.cursor
done = dealList.list_complete
}
await Promise.all([
// Total number of NFTs stored on Filecoin in active deals
metrics.set('nfts:deals:active:total', totals.active),
metrics.set('nfts:deals:published:total', totals.published),
metrics.set('nfts:deals:accepted:total', totals.accepted),
metrics.set('nfts:deals:proposing:total', totals.proposing),
// Total number of NFTs queued for the next deal batch
metrics.set('nfts:deals:queued:total', totals.queued),
metrics.set('nfts:deals:failed:total', totals.failed),
metrics.set('nfts:deals:terminated:total', totals.terminated),
])
}

/**
* @param {import('../bindings.js').Deal[]} deals
* @returns {DealsSummary}
*/
function getDealsSummary(deals) {
const summary = {
queued: 0,
proposing: 0,
accepted: 0,
failed: 0,
published: 0,
active: 0,
terminated: 0,
}
deals.forEach((d) => {
summary[d.status]++
})
return summary
}

/**
*
* @param {DealsSummary} summary
* @returns {import('../bindings.js').Deal['status'] | 'unknown'}
*/
function getEffectiveStatus(summary) {
/** @type import('../bindings.js').Deal['status'][] */
const orderedStatues = [
'active',
'published',
'accepted',
'proposing',
'queued',
'failed',
'terminated',
]
for (const s of orderedStatues) {
if (summary[s]) return s
}
return 'unknown'
}
51 changes: 51 additions & 0 deletions site/src/jobs/pins.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { stores } from '../constants.js'
import * as cluster from '../cluster.js'

export async function updatePinStatuses() {
let done = false
let cursor
while (!done) {
// @ts-ignore
const nftList = await stores.nfts.list({ cursor, limit: 1000 })
for (const k of nftList.keys) {
const cid = k.name.split(':')[1]
// Look up size for pinned data via pinning service API
if (k.metadata == null || !isPinnedOrFailed(k.metadata.pinStatus)) {
try {
const pinStatus = cluster.toPSAStatus(await cluster.status(cid))
if (!isPinnedOrFailed(pinStatus)) continue
const d = await stores.nfts.getWithMetadata(k.name)
if (d.value == null) throw new Error('missing NFT')
/** @type import('../bindings').NFT */
const nft = JSON.parse(d.value)
const prevStatus = nft.pin.status
nft.pin.status = pinStatus
const prevSize = nft.size
if (pinStatus === 'pinned') {
// for successful pin we can update the size
nft.size = nft.pin.size = nft.size || (await cluster.dagSize(cid))
}
// @ts-ignore
const metadata = { ...(d.metadata || {}), pinStatus, size: nft.size }
await stores.nfts.put(k.name, JSON.stringify(nft), { metadata })
console.log(
`${cid}: pin status ${prevStatus} => ${nft.pin.status}, size ${prevSize} => ${nft.size}`
)
} catch (err) {
console.error(
`${cid}: failed to update pin status and size: ${err.stack}`
)
}
}
}
cursor = nftList.cursor
done = nftList.list_complete
}
}

/**
* @param {string} status
*/
function isPinnedOrFailed(status) {
return ['pinned', 'failed'].includes(status)
}
18 changes: 18 additions & 0 deletions site/src/models/metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { stores as metrics } from '../constants.js'

/**
* @param {string} key
* @returns {Promise<any>}
*/
export async function get(key) {
return metrics.metrics.get(key, 'json')
}

/**
* @param {string} key
* @param {any} value
* @returns {Promise<void>}
*/
export const set = async (key, value) => {
await metrics.metrics.put(key, JSON.stringify(value))
}
Loading

0 comments on commit c429e41

Please sign in to comment.