Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue-bundle): wait for bundle to be saved to respond #265

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions scripts/import-data/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ export const fetchWithRetry = async (
if (response.ok) {
return response;
}
if (response.status === 429) {
if (response.status === 429 || response.status === 503) {
console.warn(
`Import queue is full! Waiting 30 seconds before retrying...`,
`${await response.text()}. Waiting 30 seconds before retrying...`,
);
await new Promise((resolve) => setTimeout(resolve, 30000));
continue;
Expand Down Expand Up @@ -88,8 +88,14 @@ export const getFilesInRange = async ({
.filter((file) => path.extname(file) === '.json')
.filter((file) => {
const match = file.match(/^\d+/);
const number = match ? parseInt(match[0], 10) : null;
return number !== null && number >= min && number <= max;
if (!match) return false;
const number = parseInt(match[0], 10);
return number >= min && number <= max;
})
.sort((a, b) => {
const numA = parseInt(a.match(/^\d+/)?.[0] ?? '0', 10);
const numB = parseInt(b.match(/^\d+/)?.[0] ?? '0', 10);
return numA - numB;
});

return filesInRange;
Expand Down
16 changes: 11 additions & 5 deletions src/routes/ar-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import createPrometheusMiddleware from 'express-prometheus-middleware';

import * as config from '../config.js';
import * as system from '../system.js';
import * as events from '../events.js';
import * as metrics from '../metrics.js';
import { release } from '../version.js';
import { db, signatureStore } from '../system.js';
import log from '../log.js';
import { ParquetExporter } from '../workers/parquet-exporter.js';
import { NormalizedDataItem, PartialJsonTransaction } from '../types.js';

export const arIoRouter = Router();
export let parquetExporter: ParquetExporter | null = null;
Expand Down Expand Up @@ -215,10 +215,16 @@ arIoRouter.post(
return;
}

system.eventEmitter.emit(events.ANS104_BUNDLE_QUEUED, {
id,
root_tx_id: id,
});
const queuedBundle = await system.queueBundle(
{ id, root_tx_id: id } as NormalizedDataItem | PartialJsonTransaction,
true,
true,
);

if (queuedBundle.error !== undefined) {
res.status(503).send(queuedBundle.error);
return;
}

res.json({ message: 'Bundle queued' });
} catch (error: any) {
Expand Down
23 changes: 11 additions & 12 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,20 +479,23 @@ export const bundleDataImporter = new BundleDataImporter({
metrics.registerQueueLengthGauge('bundleDataImporter', {
length: () => bundleDataImporter.queueDepth(),
});

async function queueBundle(
export type QueueBundleResponse = {
status: 'skipped' | 'queued' | 'error';
error?: string;
};
export async function queueBundle(
item: NormalizedDataItem | PartialJsonTransaction,
isPrioritized = false,
bypassFilter = false,
) {
): Promise<QueueBundleResponse> {
try {
if ('root_tx_id' in item && item.root_tx_id === null) {
log.debug('Skipping download of optimistically indexed data item', {
id: item.id,
rootTxId: item.root_tx_id,
parentId: item.parent_id,
});
return;
return { status: 'skipped' };
}

await db.saveBundle({
Expand Down Expand Up @@ -530,22 +533,18 @@ async function queueBundle(
skippedAt: currentUnixTimestamp(),
});
}

return { status: 'queued' };
} catch (error: any) {
log.error('Error saving or queueing bundle', {
message: error.message,
stack: error.stack,
});

return { status: 'error', error: 'Error queueing bundle' };
}
}

// Queue bundles from the queue-bundle route
eventEmitter.on(
events.ANS104_BUNDLE_QUEUED,
async (item: NormalizedDataItem | PartialJsonTransaction) => {
await queueBundle(item, true, true);
},
);

// Queue L1 bundles
eventEmitter.on(
events.ANS104_TX_INDEXED,
Expand Down
Loading