Bug 726054 - Desktop parity: track last sync time from final upload, not final fetch. r=nalexander

This commit is contained in:
Richard Newman
2012-02-15 22:05:53 -08:00
parent ec77ce433e
commit ddcce24b1c
11 changed files with 86 additions and 40 deletions

View File

@@ -146,7 +146,7 @@ public class Crypto5MiddlewareRepositorySession extends RepositorySession {
} }
@Override @Override
public void onFetchSucceeded(Record[] records, long end) { public void onFetchSucceeded(Record[] records, long fetchEnd) {
for (Record record : records) { for (Record record : records) {
try { try {
this.onFetchedRecord(record); this.onFetchedRecord(record);
@@ -154,12 +154,12 @@ public class Crypto5MiddlewareRepositorySession extends RepositorySession {
this.onFetchFailed(e, record); this.onFetchFailed(e, record);
} }
} }
this.onFetchCompleted(end); this.onFetchCompleted(fetchEnd);
} }
@Override @Override
public void onFetchCompleted(long end) { public void onFetchCompleted(final long fetchEnd) {
next.onFetchCompleted(end); next.onFetchCompleted(fetchEnd);
} }
@Override @Override
@@ -237,4 +237,9 @@ public class Crypto5MiddlewareRepositorySession extends RepositorySession {
public void storeDone() { public void storeDone() {
inner.storeDone(); inner.storeDone();
} }
@Override
public void storeDone(long end) {
inner.storeDone(end);
}
} }

View File

@@ -147,11 +147,18 @@ public abstract class RepositorySession {
public abstract void store(Record record) throws NoStoreDelegateException; public abstract void store(Record record) throws NoStoreDelegateException;
public void storeDone() { public void storeDone() {
// Our default behavior will be to assume that the Runnable is
// executed as soon as all the stores synchronously finish, so
// our end timestamp can just be… now.
storeDone(now());
}
public void storeDone(final long end) {
Log.d(LOG_TAG, "Scheduling onStoreCompleted for after storing is done."); Log.d(LOG_TAG, "Scheduling onStoreCompleted for after storing is done.");
Runnable command = new Runnable() { Runnable command = new Runnable() {
@Override @Override
public void run() { public void run() {
delegate.onStoreCompleted(); delegate.onStoreCompleted(end);
} }
}; };
storeWorkQueue.execute(command); storeWorkQueue.execute(command);

View File

@@ -41,6 +41,7 @@ import java.io.IOException;
import org.json.simple.parser.ParseException; import org.json.simple.parser.ParseException;
import org.mozilla.gecko.sync.ExtendedJSONObject; import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.Logger;
import org.mozilla.gecko.sync.NonObjectJSONException; import org.mozilla.gecko.sync.NonObjectJSONException;
import android.util.Log; import android.util.Log;
@@ -75,8 +76,11 @@ public class RepositorySessionBundle extends ExtendedJSONObject {
} }
public void bumpTimestamp(long timestamp) { public void bumpTimestamp(long timestamp) {
if (timestamp > this.getTimestamp()) { long existing = this.getTimestamp();
if (timestamp > existing) {
this.setTimestamp(timestamp); this.setTimestamp(timestamp);
} else {
Logger.debug(LOG_TAG, "Timestamp " + timestamp + " not greater than " + existing + "; not bumping.");
} }
} }
} }

View File

@@ -44,6 +44,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import org.json.simple.JSONArray; import org.json.simple.JSONArray;
import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.CryptoRecord;
@@ -186,6 +187,20 @@ public class Server11RepositorySession extends RepositorySession {
Server11Repository serverRepository; Server11Repository serverRepository;
AtomicLong uploadTimestamp = new AtomicLong(0);
private void bumpUploadTimestamp(long ts) {
while (true) {
long existing = uploadTimestamp.get();
if (existing > ts) {
return;
}
if (uploadTimestamp.compareAndSet(existing, ts)) {
return;
}
}
}
public Server11RepositorySession(Repository repository) { public Server11RepositorySession(Repository repository) {
super(repository); super(repository);
serverRepository = (Server11Repository) repository; serverRepository = (Server11Repository) repository;
@@ -320,7 +335,7 @@ public class Server11RepositorySession extends RepositorySession {
public void storeDone() { public void storeDone() {
synchronized (recordsBufferMonitor) { synchronized (recordsBufferMonitor) {
flush(); flush();
super.storeDone(); storeDone(uploadTimestamp.get());
} }
} }
@@ -379,6 +394,10 @@ public class Server11RepositorySession extends RepositorySession {
(success.size() > 0)) { (success.size() > 0)) {
Log.d(LOG_TAG, "Successful records: " + success.toString()); Log.d(LOG_TAG, "Successful records: " + success.toString());
// TODO: how do we notify without the whole record? // TODO: how do we notify without the whole record?
long ts = response.normalizedWeaveTimestamp();
Log.d(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + ts);
bumpUploadTimestamp(ts);
} }
if ((failed != null) && if ((failed != null) &&
(failed.object.size() > 0)) { (failed.object.size() > 0)) {

View File

@@ -61,11 +61,11 @@ public class DeferredRepositorySessionFetchRecordsDelegate implements Repository
} }
@Override @Override
public void onFetchSucceeded(final Record[] records, final long end) { public void onFetchSucceeded(final Record[] records, final long fetchEnd) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
inner.onFetchSucceeded(records, end); inner.onFetchSucceeded(records, fetchEnd);
} }
}); });
} }
@@ -81,11 +81,11 @@ public class DeferredRepositorySessionFetchRecordsDelegate implements Repository
} }
@Override @Override
public void onFetchCompleted(final long end) { public void onFetchCompleted(final long fetchEnd) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
inner.onFetchCompleted(end); inner.onFetchCompleted(fetchEnd);
} }
}); });
} }
@@ -97,4 +97,4 @@ public class DeferredRepositorySessionFetchRecordsDelegate implements Repository
} }
throw new IllegalArgumentException("Can't re-defer this delegate."); throw new IllegalArgumentException("Can't re-defer this delegate.");
} }
} }

View File

@@ -81,12 +81,12 @@ public class DeferredRepositorySessionStoreDelegate implements
} }
@Override @Override
public void onStoreCompleted() { public void onStoreCompleted(final long storeEnd) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
inner.onStoreCompleted(); inner.onStoreCompleted(storeEnd);
} }
}); });
} }
} }

View File

@@ -55,11 +55,11 @@ public interface RepositorySessionFetchRecordsDelegate {
* which the request was received. * which the request was received.
* E.g., the (normalized) value of the X-Weave-Timestamp header. * E.g., the (normalized) value of the X-Weave-Timestamp header.
*/ */
public void onFetchCompleted(long end); public void onFetchCompleted(final long fetchEnd);
// Shorthand for calling onFetchedRecord for each record in turn, then // Shorthand for calling onFetchedRecord for each record in turn, then
// calling onFetchCompleted. // calling onFetchCompleted.
public void onFetchSucceeded(Record[] records, long end); public void onFetchSucceeded(Record[] records, final long fetchEnd);
public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor); public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor);
} }

View File

@@ -51,6 +51,6 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
public interface RepositorySessionStoreDelegate { public interface RepositorySessionStoreDelegate {
public void onRecordStoreFailed(Exception ex); public void onRecordStoreFailed(Exception ex);
public void onRecordStoreSucceeded(Record record); public void onRecordStoreSucceeded(Record record);
public void onStoreCompleted(); public void onStoreCompleted(long storeEnd);
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor); public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
} }

View File

@@ -51,8 +51,6 @@ import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecor
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record; import org.mozilla.gecko.sync.repositories.domain.Record;
import android.util.Log;
/** /**
* Pulls records from `source`, applying them to `sink`. * Pulls records from `source`, applying them to `sink`.
* Notifies its delegate of errors and completion. * Notifies its delegate of errors and completion.
@@ -104,7 +102,7 @@ class RecordsChannel implements
public RepositorySession sink; public RepositorySession sink;
private RecordsChannelDelegate delegate; private RecordsChannelDelegate delegate;
private long timestamp; private long timestamp;
private long end = -1; // Oo er, missus. private long fetchEnd = -1;
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) { public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
this.source = source; this.source = source;
@@ -198,19 +196,19 @@ class RecordsChannel implements
} }
@Override @Override
public void onFetchSucceeded(Record[] records, long end) { public void onFetchSucceeded(Record[] records, final long fetchEnd) {
for (Record record : records) { for (Record record : records) {
this.toProcess.add(record); this.toProcess.add(record);
} }
this.consumer.doNotify(); this.consumer.doNotify();
this.onFetchCompleted(end); this.onFetchCompleted(fetchEnd);
} }
@Override @Override
public void onFetchCompleted(long end) { public void onFetchCompleted(final long fetchEnd) {
Logger.info(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done."); Logger.info(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
Logger.info(LOG_TAG, "Fetch timestamp is " + end); Logger.info(LOG_TAG, "Fetch timestamp is " + fetchEnd);
this.end = end; this.fetchEnd = fetchEnd;
this.consumer.queueFilled(); this.consumer.queueFilled();
} }
@@ -237,10 +235,11 @@ class RecordsChannel implements
} }
@Override @Override
public void onStoreCompleted() { public void onStoreCompleted(long storeEnd) {
Logger.info(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. End is " + end); Logger.info(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
"Fetch end is " + fetchEnd + ", store end is " + storeEnd);
// TODO: synchronize on consumer callback? // TODO: synchronize on consumer callback?
delegate.onFlowCompleted(this, end); delegate.onFlowCompleted(this, fetchEnd, storeEnd);
} }
@Override @Override

View File

@@ -38,7 +38,7 @@
package org.mozilla.gecko.sync.synchronizer; package org.mozilla.gecko.sync.synchronizer;
public interface RecordsChannelDelegate { public interface RecordsChannelDelegate {
public void onFlowCompleted(RecordsChannel recordsChannel, long end); public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd);
public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex);
public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex); public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex);

View File

@@ -66,8 +66,15 @@ implements RecordsChannelDelegate,
private RepositorySession sessionB; private RepositorySession sessionB;
private RepositorySessionBundle bundleA; private RepositorySessionBundle bundleA;
private RepositorySessionBundle bundleB; private RepositorySessionBundle bundleB;
// Bug 726054: just like desktop, we track our last interaction with the server,
// not the last record timestamp that we fetched. This ensures that we don't re-
// download the records we just uploaded, at the cost of skipping any records
// that a concurrently syncing client has uploaded.
private long pendingATimestamp = -1; private long pendingATimestamp = -1;
private long pendingBTimestamp = -1; private long pendingBTimestamp = -1;
private long storeEndATimestamp = -1;
private long storeEndBTimestamp = -1;
private boolean flowAToBCompleted = false; private boolean flowAToBCompleted = false;
private boolean flowBToACompleted = false; private boolean flowBToACompleted = false;
@@ -133,9 +140,11 @@ implements RecordsChannelDelegate,
// TODO: failed record handling. // TODO: failed record handling.
final RecordsChannel channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this); final RecordsChannel channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
RecordsChannelDelegate channelDelegate = new RecordsChannelDelegate() { RecordsChannelDelegate channelDelegate = new RecordsChannelDelegate() {
public void onFlowCompleted(RecordsChannel recordsChannel, long end) { public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
info("First RecordsChannel flow completed. End is " + end + ". Starting next."); info("First RecordsChannel flow completed. Fetch end is " + fetchEnd +
pendingATimestamp = end; ". Store end is " + storeEnd + ". Starting next.");
pendingATimestamp = fetchEnd;
storeEndBTimestamp = storeEnd;
flowAToBCompleted = true; flowAToBCompleted = true;
channelBToA.flow(); channelBToA.flow();
} }
@@ -165,9 +174,12 @@ implements RecordsChannelDelegate,
} }
@Override @Override
public void onFlowCompleted(RecordsChannel channel, long end) { public void onFlowCompleted(RecordsChannel channel, long fetchEnd, long storeEnd) {
info("Second RecordsChannel flow completed. End is " + end + ". Finishing."); info("Second RecordsChannel flow completed. Fetch end is " + fetchEnd +
pendingBTimestamp = end; ". Store end is " + storeEnd + ". Finishing.");
pendingBTimestamp = fetchEnd;
storeEndATimestamp = storeEnd;
flowBToACompleted = true; flowBToACompleted = true;
// Finish the two sessions. // Finish the two sessions.
@@ -278,8 +290,8 @@ implements RecordsChannelDelegate,
if (session == sessionA) { if (session == sessionA) {
if (flowAToBCompleted) { if (flowAToBCompleted) {
info("onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp); info("onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp);
bundle.bumpTimestamp(pendingATimestamp); bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp));
this.synchronizer.bundleA = bundle; this.synchronizer.bundleA = bundle;
} }
if (this.sessionB != null) { if (this.sessionB != null) {
@@ -289,8 +301,8 @@ implements RecordsChannelDelegate,
} }
} else if (session == sessionB) { } else if (session == sessionB) {
if (flowBToACompleted) { if (flowBToACompleted) {
info("onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp); info("onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp);
bundle.bumpTimestamp(pendingBTimestamp); bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp));
this.synchronizer.bundleB = bundle; this.synchronizer.bundleB = bundle;
info("Notifying delegate.onSynchronized."); info("Notifying delegate.onSynchronized.");
this.delegate.onSynchronized(this); this.delegate.onSynchronized(this);