/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ "use strict"; this.EXPORTED_SYMBOLS = [ "Sync", ]; const { classes: Cc, interfaces: Ci, utils: Cu } = Components; Cu.import("resource://gre/modules/Log.jsm"); Cu.import("resource://gre/modules/Task.jsm"); Cu.import("resource://gre/modules/XPCOMUtils.jsm"); XPCOMUtils.defineLazyModuleGetter(this, "Preferences", "resource://gre/modules/Preferences.jsm"); XPCOMUtils.defineLazyModuleGetter(this, "ReadingList", "resource:///modules/readinglist/ReadingList.jsm"); XPCOMUtils.defineLazyModuleGetter(this, "ServerClient", "resource:///modules/readinglist/ServerClient.jsm"); // The maximum number of sub-requests per POST /batch supported by the server. // See http://readinglist.readthedocs.org/en/latest/api/batch.html. const BATCH_REQUEST_LIMIT = 25; // The Last-Modified header of server responses is stored here. const SERVER_LAST_MODIFIED_HEADER_PREF = "readinglist.sync.serverLastModified"; // Maps local record properties to server record properties. const SERVER_PROPERTIES_BY_LOCAL_PROPERTIES = { guid: "id", serverLastModified: "last_modified", url: "url", preview: "preview", title: "title", resolvedURL: "resolved_url", resolvedTitle: "resolved_title", excerpt: "excerpt", archived: "archived", deleted: "deleted", favorite: "favorite", isArticle: "is_article", wordCount: "word_count", unread: "unread", addedBy: "added_by", addedOn: "added_on", storedOn: "stored_on", markedReadBy: "marked_read_by", markedReadOn: "marked_read_on", readPosition: "read_position", }; // Local record properties that can be uploaded in new items. const NEW_RECORD_PROPERTIES = ` url title resolvedURL resolvedTitle excerpt favorite isArticle wordCount unread addedBy addedOn markedReadBy markedReadOn readPosition preview `.trim().split(/\s+/); // Local record properties that can be uploaded in changed items. const MUTABLE_RECORD_PROPERTIES = ` title resolvedURL resolvedTitle excerpt favorite isArticle wordCount unread markedReadBy markedReadOn readPosition preview `.trim().split(/\s+/); let log = Log.repository.getLogger("readinglist.sync"); /** * An object that syncs reading list state with a server. To sync, make a new * SyncImpl object and then call start() on it. * * @param readingList The ReadingList to sync. */ function SyncImpl(readingList) { this.list = readingList; this._client = new ServerClient(); } /** * This implementation uses the sync algorithm described here: * https://github.com/mozilla-services/readinglist/wiki/Client-phases * The "phases" mentioned in the methods below refer to the phases in that * document. */ SyncImpl.prototype = { /** * Starts sync, if it's not already started. * * @return Promise this.promise, i.e., a promise that will be resolved * when sync completes, rejected on error. */ start() { if (!this.promise) { this.promise = Task.spawn(function* () { try { yield this._start(); } finally { delete this.promise; } }.bind(this)); } return this.promise; }, /** * A Promise that will be non-null when sync is ongoing. Resolved when * sync completes, rejected on error. */ promise: null, /** * See the document linked above that describes the sync algorithm. */ _start: Task.async(function* () { log.info("Starting sync"); yield this._logDiagnostics(); yield this._uploadStatusChanges(); yield this._uploadNewItems(); yield this._uploadDeletedItems(); yield this._downloadModifiedItems(); // TODO: "Repeat [this phase] until no conflicts occur," says the doc. yield this._uploadMaterialChanges(); log.info("Sync done"); }), /** * Phase 0 - for debugging we log some stuff about the local store before * we start syncing. * We only do this when the log level is "Trace" or lower as the info (a) * may be expensive to generate, (b) generate alot of output and (c) may * contain private information. */ _logDiagnostics: Task.async(function* () { // Sadly our log is likely to have Log.Level.All, so loop over our // appenders looking for the effective level. let smallestLevel = log.appenders.reduce( (prev, appender) => Math.min(prev, appender.level), Log.Level.Error); if (smallestLevel > Log.Level.Trace) { return; } let localItems = []; yield this.list.forEachItem(localItem => localItems.push(localItem)); log.trace("Have " + localItems.length + " local item(s)"); for (let localItem of localItems) { // We need to use .record so we get access to a couple of the "internal" fields. let record = localItem._record; let redacted = {}; for (let attr of ["guid", "url", "resolvedURL", "serverLastModified", "syncStatus"]) { redacted[attr] = record[attr]; } log.trace(JSON.stringify(redacted)); } // and the GUIDs of deleted items. let deletedGuids = [] yield this.list.forEachSyncedDeletedGUID(guid => deletedGuids.push(guid)); // This might be a huge line, but that's OK. log.trace("Have ${num} deleted item(s): ${deletedGuids}", {num: deletedGuids.length, deletedGuids}); }), /** * Phase 1 part 1 * * Uploads not-new items with status-only changes. By design, status-only * changes will never conflict with what's on the server. */ _uploadStatusChanges: Task.async(function* () { log.debug("Phase 1 part 1: Uploading status changes"); yield this._uploadChanges(ReadingList.SyncStatus.CHANGED_STATUS, ReadingList.SyncStatusProperties.STATUS); }), /** * There are two phases for uploading changed not-new items: one for items * with status-only changes, one for items with material changes. The two * work similarly mechanically, and this method is a helper for both. * * @param syncStatus Local items matching this sync status will be uploaded. * @param localProperties An array of local record property names. The * uploaded item records will include only these properties. */ _uploadChanges: Task.async(function* (syncStatus, localProperties) { // Get local items that match the given syncStatus. let requests = []; yield this.list.forEachItem(localItem => { requests.push({ path: "/articles/" + localItem.guid, body: serverRecordFromLocalItem(localItem, localProperties), }); }, { syncStatus: syncStatus }); if (!requests.length) { log.debug("No local changes to upload"); return; } // Send the request. let request = { body: { defaults: { method: "PATCH", }, requests: requests, }, }; let batchResponse = yield this._postBatch(request); if (batchResponse.status != 200) { this._handleUnexpectedResponse(true, "uploading changes", batchResponse); return; } // Update local items based on the response. for (let response of batchResponse.body.responses) { if (response.status == 404) { // item deleted yield this._deleteItemForGUID(response.body.id); continue; } if (response.status == 409) { // "Conflict": A change violated a uniqueness constraint. Mark the item // as having material changes, and reconcile and upload it in the // material-changes phase. // TODO continue; } if (response.status != 200) { this._handleUnexpectedResponse(false, "uploading a change", response); continue; } // Don't assume the local record and the server record aren't materially // different. Reconcile the differences. // TODO let item = yield this._itemForGUID(response.body.id); yield this._updateItemWithServerRecord(item, response.body); } }), /** * Phase 1 part 2 * * Uploads new items. */ _uploadNewItems: Task.async(function* () { log.debug("Phase 1 part 2: Uploading new items"); // Get new local items. let requests = []; yield this.list.forEachItem(localItem => { requests.push({ body: serverRecordFromLocalItem(localItem, NEW_RECORD_PROPERTIES), }); }, { syncStatus: ReadingList.SyncStatus.NEW }); if (!requests.length) { log.debug("No new local items to upload"); return; } // Send the request. let request = { body: { defaults: { method: "POST", path: "/articles", }, requests: requests, }, }; let batchResponse = yield this._postBatch(request); if (batchResponse.status != 200) { this._handleUnexpectedResponse(true, "uploading new items", batchResponse); return; } // Update local items based on the response. for (let response of batchResponse.body.responses) { if (response.status == 303) { // "See Other": An item with the URL already exists. Mark the item as // having material changes, and reconcile and upload it in the // material-changes phase. // TODO continue; } // Note that the server seems to return a 200 if an identical item already // exists, but we shouldn't be uploading identical items in this phase in // normal usage. But if something goes wrong locally (eg, we upload but // get some error even though the upload worked) we will see this. // So allow 200 but log a warning. if (response.status == 200) { log.debug("Attempting to upload a new item found the server already had it", response); // but we still process it. } else if (response.status != 201) { this._handleUnexpectedResponse(false, "uploading a new item", response); continue; } let item = yield this.list.itemForURL(response.body.url); yield this._updateItemWithServerRecord(item, response.body); } }), /** * Phase 1 part 3 * * Uploads deleted synced items. */ _uploadDeletedItems: Task.async(function* () { log.debug("Phase 1 part 3: Uploading deleted items"); // Get deleted synced local items. let requests = []; yield this.list.forEachSyncedDeletedGUID(guid => { requests.push({ path: "/articles/" + guid, }); }); if (!requests.length) { log.debug("No local deleted synced items to upload"); return; } // Send the request. let request = { body: { defaults: { method: "DELETE", }, requests: requests, }, }; let batchResponse = yield this._postBatch(request); if (batchResponse.status != 200) { this._handleUnexpectedResponse(true, "uploading deleted items", batchResponse); return; } // Delete local items based on the response. for (let response of batchResponse.body.responses) { // A 404 means the item was already deleted on the server, which is OK. // We still need to make sure it's deleted locally, though. if (response.status != 200 && response.status != 404) { this._handleUnexpectedResponse(false, "uploading a deleted item", response); continue; } yield this._deleteItemForGUID(response.body.id); } }), /** * Phase 2 * * Downloads items that were modified since the last sync. */ _downloadModifiedItems: Task.async(function* () { log.debug("Phase 2: Downloading modified items"); // Get modified items from the server. let path = "/articles"; if (this._serverLastModifiedHeader) { path += "?_since=" + this._serverLastModifiedHeader; } let request = { method: "GET", path: path, }; let response = yield this._sendRequest(request); if (response.status != 200) { this._handleUnexpectedResponse(true, "downloading modified items", response); return; } // Update local items based on the response. for (let serverRecord of response.body.items) { if (serverRecord.deleted) { // _deleteItemForGUID is a no-op if no item exists with the GUID. yield this._deleteItemForGUID(serverRecord.id); continue; } let localItem = yield this._itemForGUID(serverRecord.id); if (localItem) { if (localItem.serverLastModified == serverRecord.last_modified) { // We just uploaded this item in the new-items phase. continue; } // The local item may have materially changed. In that case, don't // overwrite the local changes with the server record. Instead, mark // the item as having material changes and reconcile and upload it in // the material-changes phase. // TODO yield this._updateItemWithServerRecord(localItem, serverRecord); continue; } // A potentially new item. addItem() will fail here when an item was // added to the local list between the time we uploaded new items and // now. let localRecord = localRecordFromServerRecord(serverRecord); try { yield this.list.addItem(localRecord); } catch (ex) { if (ex instanceof ReadingList.Error.Exists) { log.debug("Tried to add an item that already exists."); } else { log.error("Error adding an item from server record ${serverRecord} ${ex}", { serverRecord, ex }); } } } // Now that changes have been successfully applied, advance the server // last-modified timestamp so that next time we fetch items starting from // the current point. Response header names are lowercase. if (response.headers && "last-modified" in response.headers) { this._serverLastModifiedHeader = response.headers["last-modified"]; } }), /** * Phase 3 (material changes) * * Uploads not-new items with material changes. */ _uploadMaterialChanges: Task.async(function* () { log.debug("Phase 3: Uploading material changes"); yield this._uploadChanges(ReadingList.SyncStatus.CHANGED_MATERIAL, MUTABLE_RECORD_PROPERTIES); }), /** * Gets the local ReadingListItem with the given GUID. * * @param guid The item's GUID. * @return The matching ReadingListItem. */ _itemForGUID: Task.async(function* (guid) { return (yield this.list.item({ guid: guid })); }), /** * Updates the given local ReadingListItem with the given server record. The * local item's sync status is updated to reflect the fact that the item has * been synced and is up to date. * * @param item A local ReadingListItem. * @param serverRecord A server record representing the item. */ _updateItemWithServerRecord: Task.async(function* (localItem, serverRecord) { if (!localItem) { // The item may have been deleted from the local list between the time we // saw that it needed updating and now. log.debug("Tried to update a null local item from server record", serverRecord); return; } localItem._record = localRecordFromServerRecord(serverRecord); try { yield this.list.updateItem(localItem); } catch (ex) { // The item may have been deleted from the local list after we fetched it. if (ex instanceof ReadingList.Error.Deleted) { log.debug("Tried to update an item that was deleted from server record", serverRecord); } else { log.error("Error updating an item from server record ${serverRecord} ${ex}", { serverRecord, ex }); } } }), /** * Truly deletes the local ReadingListItem with the given GUID. * * @param guid The item's GUID. */ _deleteItemForGUID: Task.async(function* (guid) { let item = yield this._itemForGUID(guid); if (item) { // If item is non-null, then it hasn't been deleted locally. Therefore // it's important to delete it through its list so that the list and its // consumers are notified properly. Set the syncStatus to NEW so that the // list truly deletes the item. item._record.syncStatus = ReadingList.SyncStatus.NEW; try { yield this.list.deleteItem(item); } catch (ex) { log.error("Failed delete local item with id ${guid} ${ex}", { guid, ex }); } return; } // If item is null, then it may not actually exist locally, or it may have // been synced and then deleted so that it's marked as being deleted. In // that case, try to delete it directly from the store. As far as the list // is concerned, the item has already been deleted. log.debug("Item not present in list, deleting it by GUID instead"); try { this.list._store.deleteItemByGUID(guid); } catch (ex) { log.error("Failed to delete local item with id ${guid} ${ex}", { guid, ex }); } }), /** * Sends a request to the server. * * @param req The request object: { method, path, body, headers }. * @return Promise Resolved with the server's response object: * { status, body, headers }. */ _sendRequest: Task.async(function* (req) { log.debug("Sending request", req); let response = yield this._client.request(req); log.debug("Received response", response); return response; }), /** * The server limits the number of sub-requests in POST /batch'es to * BATCH_REQUEST_LIMIT. This method takes an arbitrarily big batch request * and breaks it apart into many individual batch requests in order to stay * within the limit. * * @param bigRequest The same type of request object that _sendRequest takes. * Since it's a POST /batch request, its `body` should have a * `requests` property whose value is an array of sub-requests. * `method` and `path` are automatically filled. * @return Promise Resolved when all requests complete with 200s, or * when the first response that is not a 200 is received. In the * first case, the resolved response is a combination of all the * server responses, and response.body.responses contains the sub- * responses for all the sub-requests in bigRequest. In the second * case, the resolved response is the non-200 response straight from * the server. */ _postBatch: Task.async(function* (bigRequest) { log.debug("Sending batch requests"); let allSubResponses = []; let remainingSubRequests = bigRequest.body.requests; while (remainingSubRequests.length) { let request = Object.assign({}, bigRequest); request.method = "POST"; request.path = "/batch"; request.body.requests = remainingSubRequests.splice(0, BATCH_REQUEST_LIMIT); let response = yield this._sendRequest(request); if (response.status != 200) { return response; } allSubResponses = allSubResponses.concat(response.body.responses); } let bigResponse = { status: 200, body: { responses: allSubResponses, }, }; log.debug("All batch requests successfully sent"); return bigResponse; }), _handleUnexpectedResponse(isTopLevel, contextMsgFragment, response) { log.error(`Unexpected response ${contextMsgFragment}`, response); // We want to throw in some cases so the sync engine knows there was an // error and retries using the error schedule. 401 implies an auth issue // (possibly transient, possibly not) - but things like 404 might just // relate to a single item and need not throw. Any 5XX implies a // (hopefully transient) server error. if (isTopLevel && (response.status == 401 || response.status >= 500)) { throw new Error("Sync aborted due to " + response.status + " server response."); } }, // TODO: Wipe this pref when user logs out. get _serverLastModifiedHeader() { if (!("__serverLastModifiedHeader" in this)) { this.__serverLastModifiedHeader = Preferences.get(SERVER_LAST_MODIFIED_HEADER_PREF, undefined); } return this.__serverLastModifiedHeader; }, set _serverLastModifiedHeader(val) { this.__serverLastModifiedHeader = val; Preferences.set(SERVER_LAST_MODIFIED_HEADER_PREF, val); }, }; /** * Translates a local ReadingListItem into a server record. * * @param localItem The local ReadingListItem. * @param localProperties An array of local item property names. Only these * properties will be included in the server record. * @return The server record. */ function serverRecordFromLocalItem(localItem, localProperties) { let serverRecord = {}; for (let localProp of localProperties) { let serverProp = SERVER_PROPERTIES_BY_LOCAL_PROPERTIES[localProp]; if (localProp in localItem._record) { serverRecord[serverProp] = localItem._record[localProp]; } } return serverRecord; } /** * Translates a server record into a local record. The returned local record's * syncStatus will reflect the fact that the local record is up-to-date synced. * * @param serverRecord The server record. * @return The local record. */ function localRecordFromServerRecord(serverRecord) { let localRecord = { // Mark the record as being up-to-date synced. syncStatus: ReadingList.SyncStatus.SYNCED, }; for (let localProp in SERVER_PROPERTIES_BY_LOCAL_PROPERTIES) { let serverProp = SERVER_PROPERTIES_BY_LOCAL_PROPERTIES[localProp]; if (serverProp in serverRecord) { localRecord[localProp] = serverRecord[serverProp]; } } return localRecord; } Object.defineProperty(this, "Sync", { get() { if (!this._singleton) { this._singleton = new SyncImpl(ReadingList); } return this._singleton; }, });