Skip to content

Commit

Permalink
[client-sync] Transaction-ify processMessage
Browse files Browse the repository at this point in the history
Summary:
We do lots of writes while processing a single message. To reduce the
write churn from lots of mini-transactions, this diff threads one
overarching transaction to everything in processMessage.

Test Plan: Run locally

Reviewers: spang, evan, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D4394
  • Loading branch information
Mark Hahnenberg committed Apr 7, 2017
1 parent b323e38 commit c808438
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 98 deletions.
14 changes: 8 additions & 6 deletions packages/client-sync/src/message-processor/detect-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function emptyThread({Thread, accountId}, options = {}) {
return t;
}

async function findOrBuildByReferences(db, message) {
async function findOrBuildByReferences(db, message, transaction) {
const {Thread, Reference, Label, Folder} = db;

let matchingRef = null;
Expand All @@ -30,6 +30,7 @@ async function findOrBuildByReferences(db, message) {
include: [
{ model: Thread, include: [{model: Label}, {model: Folder}]},
],
transaction,
});
}

Expand All @@ -39,16 +40,17 @@ async function findOrBuildByReferences(db, message) {
return matchingRef ? matchingRef.thread : emptyThread(db, {});
}

async function findOrBuildByRemoteThreadId(db, remoteThreadId) {
async function findOrBuildByRemoteThreadId(db, remoteThreadId, transaction) {
const {Thread, Label, Folder} = db;
const existing = await Thread.find({
where: {remoteThreadId},
include: [{model: Label}, {model: Folder}],
transaction,
});
return existing || emptyThread(db, {remoteThreadId});
}

async function detectThread({db, messageValues}) {
async function detectThread({db, messageValues, transaction}) {
if (!(messageValues.labels instanceof Array)) {
throw new Error("detectThread expects labels to be an inflated array.");
}
Expand All @@ -58,9 +60,9 @@ async function detectThread({db, messageValues}) {

let thread = null;
if (messageValues.gThrId) {
thread = await findOrBuildByRemoteThreadId(db, messageValues.gThrId)
thread = await findOrBuildByRemoteThreadId(db, messageValues.gThrId, transaction)
} else {
thread = await findOrBuildByReferences(db, messageValues)
thread = await findOrBuildByReferences(db, messageValues, transaction)
}

if (!(thread.labels instanceof Array)) {
Expand All @@ -81,7 +83,7 @@ async function detectThread({db, messageValues}) {
}

thread.subject = cleanSubject(messageValues.subject);
await thread.updateFromMessages({messages: [messageValues]});
await thread.updateFromMessages({messages: [messageValues], transaction});
return thread;
}

Expand Down
11 changes: 6 additions & 5 deletions packages/client-sync/src/message-processor/extract-contacts.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function isContactMeaningful(contact) {
return true
}

async function extractContacts({db, messageValues, logger = console} = {}) {
async function extractContacts({db, messageValues, logger = console, transaction} = {}) {
const {Contact} = db
let allContacts = [];
['to', 'from', 'bcc', 'cc'].forEach((field) => {
Expand All @@ -37,13 +37,14 @@ async function extractContacts({db, messageValues, logger = console} = {}) {
where: {
id: Array.from(contactsDataById.keys()),
},
transaction,
})

for (const c of contactsDataById.values()) {
const existing = existingContacts.find(({id}) => id === c.id);

if (!existing) {
Contact.create(c).catch(Sequelize.ValidationError, (err) => {
Contact.create(c, {transaction}).catch(Sequelize.ValidationError, (err) => {
if (err.name !== "SequelizeUniqueConstraintError") {
logger.warn('Unknown error inserting contact', err);
throw err;
Expand All @@ -52,12 +53,12 @@ async function extractContacts({db, messageValues, logger = console} = {}) {
// and beat us to inserting. Since contacts are never deleted within
// an account, we can safely assume that we can perform an update
// instead.
Contact.find({where: {id: c.id}}).then(
(row) => { row.update(c) });
Contact.find({where: {id: c.id}, transaction}).then(
(row) => { row.update(c, {transaction}) });
}
});
} else {
existing.update(c);
existing.update(c, {transaction});
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/client-sync/src/message-processor/extract-files.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ function collectFilesFromStruct({db, messageValues, struct, fileIds = new Set()}
return collected;
}

async function extractFiles({db, messageValues, struct}) {
const files = collectFilesFromStruct({db, messageValues, struct});
async function extractFiles({db, messageValues, struct, transaction}) {
const files = collectFilesFromStruct({db, messageValues, struct, transaction});
if (files.length > 0) {
for (const file of files) {
await file.save()
await file.save({transaction})
}
}
return Promise.resolve(files)
Expand Down
154 changes: 79 additions & 75 deletions packages/client-sync/src/message-processor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,67 @@ class MessageProcessor {

async _processMessage({db, accountId, folder, imapMessage, struct, desiredParts, logger}) {
try {
const {Message, Folder, Label} = db;
const {sequelize, Message, Folder, Label} = db;
const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, {
db,
folder,
accountId,
});
const existingMessage = await Message.findById(messageValues.id, {
include: [{model: Folder, as: 'folder'}, {model: Label, as: 'labels'}],
});
let processedMessage;
if (existingMessage) {
// TODO: optimize to not do a full message parse for existing messages
processedMessage = await this._processExistingMessage({
logger,
struct,
messageValues,
existingMessage,
})
} else {
processedMessage = await this._processNewMessage({
logger,
struct,
messageValues,
})
}
await sequelize.transaction(async (transaction) => {
const existingMessage = await Message.findById(messageValues.id, {
include: [{model: Folder, as: 'folder'}, {model: Label, as: 'labels'}],
transaction,
});
if (existingMessage) {
// TODO: optimize to not do a full message parse for existing messages
processedMessage = await this._processExistingMessage({
db,
logger,
struct,
messageValues,
existingMessage,
transaction,
})
} else {
processedMessage = await this._processNewMessage({
db,
logger,
struct,
messageValues,
transaction,
})
}

// Inflate the serialized oldestProcessedDate value, if it exists
let oldestProcessedDate;
if (folder.syncState && folder.syncState.oldestProcessedDate) {
oldestProcessedDate = new Date(folder.syncState.oldestProcessedDate);
}
const justProcessedDate = messageValues.date ? new Date(messageValues.date) : new Date()

// Update the oldestProcessedDate if:
// a) justProcessedDate is after the year 1980. We don't want to base this
// off of messages with borked 1970 dates.
// AND
// b) i) We haven't set oldestProcessedDate yet
// OR
// ii) justProcessedDate is before oldestProcessedDate and in a different
// month. (We only use this to update the sync status in Nylas Mail,
// which uses month precision. Updating a folder's syncState triggers
// many re-renders in Nylas Mail, so we only do it as necessary.)
if (justProcessedDate > new Date("1980") && (
!oldestProcessedDate || (
(justProcessedDate.getMonth() !== oldestProcessedDate.getMonth() ||
justProcessedDate.getFullYear() !== oldestProcessedDate.getFullYear()) &&
justProcessedDate < oldestProcessedDate))) {
await folder.updateSyncState({oldestProcessedDate: justProcessedDate})
}
// Inflate the serialized oldestProcessedDate value, if it exists
let oldestProcessedDate;
if (folder.syncState && folder.syncState.oldestProcessedDate) {
oldestProcessedDate = new Date(folder.syncState.oldestProcessedDate);
}
const justProcessedDate = messageValues.date ? new Date(messageValues.date) : new Date()

// Update the oldestProcessedDate if:
// a) justProcessedDate is after the year 1980. We don't want to base this
// off of messages with borked 1970 dates.
// AND
// b) i) We haven't set oldestProcessedDate yet
// OR
// ii) justProcessedDate is before oldestProcessedDate and in a different
// month. (We only use this to update the sync status in Nylas Mail,
// which uses month precision. Updating a folder's syncState triggers
// many re-renders in Nylas Mail, so we only do it as necessary.)
if (justProcessedDate > new Date("1980") && (
!oldestProcessedDate || (
(justProcessedDate.getMonth() !== oldestProcessedDate.getMonth() ||
justProcessedDate.getFullYear() !== oldestProcessedDate.getFullYear()) &&
justProcessedDate < oldestProcessedDate))) {
await folder.updateSyncState({oldestProcessedDate: justProcessedDate}, {transaction})
}

const activity = `🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`
logger.log(activity)
SyncActivity.reportSyncActivity(accountId, activity)
const activity = `🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`
logger.log(activity)
SyncActivity.reportSyncActivity(accountId, activity)
});
return processedMessage
} catch (err) {
await this._onError({imapMessage, desiredParts, folder, err, logger});
Expand Down Expand Up @@ -198,7 +205,7 @@ class MessageProcessor {
// Replaces ["<rfc2822messageid>", ...] with [[object Reference], ...]
// Creates references that do not yet exist, and adds the correct
// associations as well
async _addReferences(db, message, thread, references) {
async _addReferences(db, message, thread, references, transaction) {
const {Reference} = db;

let existingReferences = [];
Expand All @@ -207,6 +214,7 @@ class MessageProcessor {
where: {
rfc2822MessageId: references,
},
transaction,
});
}

Expand All @@ -216,45 +224,43 @@ class MessageProcessor {
}
for (const mid of references) {
if (!refByMessageId[mid]) {
refByMessageId[mid] = await Reference.create({rfc2822MessageId: mid, threadId: thread.id});
refByMessageId[mid] = await Reference.create({rfc2822MessageId: mid, threadId: thread.id}, {transaction});
}
}

const referencesInstances = references.map(mid => refByMessageId[mid]);
await message.addReferences(referencesInstances);
await message.addReferences(referencesInstances, {transaction});
message.referencesOrder = referencesInstances.map(ref => ref.id);
await thread.addReferences(referencesInstances);
await thread.addReferences(referencesInstances, {transaction});
}

async _processNewMessage({messageValues, struct, logger = console} = {}) {
const {accountId} = messageValues;
const db = await LocalDatabaseConnector.forAccount(accountId);
async _processNewMessage({db, messageValues, struct, logger = console, transaction} = {}) {
const {Message} = db

const thread = await detectThread({db, messageValues});
const thread = await detectThread({db, messageValues, transaction});
messageValues.threadId = thread.id;
const createdMessage = await Message.create(messageValues);
const createdMessage = await Message.create(messageValues, {transaction});

if (messageValues.labels) {
await createdMessage.addLabels(messageValues.labels)
await createdMessage.addLabels(messageValues.labels, {transaction})
// Note that the labels aren't officially associated until save() is called later
}

await this._addReferences(db, createdMessage, thread, messageValues.references);
await this._addReferences(db, createdMessage, thread, messageValues.references, transaction);

// TODO: need to delete dangling references somewhere (maybe at the
// end of the sync loop?)

const files = await extractFiles({db, messageValues, struct});
const files = await extractFiles({db, messageValues, struct, transaction});
// Don't count inline images (files with contentIds) as attachments
if (files.some(f => !f.contentId) && !thread.hasAttachments) {
thread.hasAttachments = true;
await thread.save();
await thread.save({transaction});
}
await extractContacts({db, messageValues, logger});
await extractContacts({db, messageValues, logger, transaction});

createdMessage.isProcessed = true;
await createdMessage.save()
await createdMessage.save({transaction})
return createdMessage
}

Expand All @@ -271,10 +277,7 @@ class MessageProcessor {
* or because we interrupted the sync loop before the message was fully
* processed.
*/
async _processExistingMessage({existingMessage, messageValues, struct} = {}) {
const {accountId} = messageValues;
const db = await LocalDatabaseConnector.forAccount(accountId);

async _processExistingMessage({db, existingMessage, messageValues, struct, transaction} = {}) {
/**
* There should never be a reason to update the body of a message
* already in the database.
Expand All @@ -288,38 +291,39 @@ class MessageProcessor {
*/
const newMessageWithoutBody = _.clone(messageValues)
delete newMessageWithoutBody.body;
await existingMessage.update(newMessageWithoutBody);
await existingMessage.update(newMessageWithoutBody, {transaction});
if (messageValues.labels && messageValues.labels.length > 0) {
await existingMessage.setLabels(messageValues.labels)
await existingMessage.setLabels(messageValues.labels, {transaction})
}

let thread = await existingMessage.getThread({
include: [{model: db.Folder, as: 'folders'}, {model: db.Label, as: 'labels'}],
transaction,
});
if (!existingMessage.isProcessed) {
if (!thread) {
thread = await detectThread({db, messageValues});
thread = await detectThread({db, messageValues, transaction});
existingMessage.threadId = thread.id;
} else {
await thread.updateFromMessages({db, messages: [existingMessage]})
await thread.updateFromMessages({db, messages: [existingMessage], transaction})
}
await this._addReferences(db, existingMessage, thread, messageValues.references);
const files = await extractFiles({db, messageValues: existingMessage, struct});
await this._addReferences(db, existingMessage, thread, messageValues.references, transaction);
const files = await extractFiles({db, messageValues: existingMessage, struct, transaction});
// Don't count inline images (files with contentIds) as attachments
if (files.some(f => !f.contentId) && !thread.hasAttachments) {
thread.hasAttachments = true;
await thread.save();
await thread.save({transaction});
}
await extractContacts({db, messageValues: existingMessage});
await extractContacts({db, messageValues: existingMessage, transaction});
existingMessage.isProcessed = true;
} else {
if (!thread) {
throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`)
}
}

await existingMessage.save();
await thread.updateLabelsAndFolders();
await existingMessage.save({transaction});
await thread.updateLabelsAndFolders({transaction});
return existingMessage
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/client-sync/src/models/folder.es6
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ export default (sequelize, Sequelize) => {
)
},

async updateSyncState(nextSyncState = {}) {
async updateSyncState(nextSyncState = {}, {transaction} = {}) {
if (_.isMatch(this.syncState, nextSyncState)) {
return Promise.resolve();
}
await this.reload(); // Fetch any recent syncState updates
await this.reload({transaction}); // Fetch any recent syncState updates
this.syncState = Object.assign(this.syncState, nextSyncState);
return this.save();
return this.save({transaction});
},

syncProgress() {
Expand Down
Loading

0 comments on commit c808438

Please sign in to comment.