Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quota limits for push command #545

Merged
merged 11 commits into from
Dec 10, 2020
Prev Previous commit
Next Next commit
Invalidate cache when repo is removed
  • Loading branch information
ikhoon committed Dec 7, 2020
commit d1373f5a5dc5c4d2db743f081619a0abc30fd218
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class StandaloneCommandExecutor extends AbstractCommandExecutor {
private final Executor repositoryWorker;
@Nullable
private final SessionManager sessionManager;
// if permitsPerSecond is -1, a quota is checked by ZooKeeperCommandExecutor.
private final double permitsPerSecond;
private final MetadataService metadataService;

Expand Down Expand Up @@ -225,7 +226,9 @@ private CompletableFuture<Void> createRepository(CreateRepositoryCommand c) {
}

private CompletableFuture<Void> removeRepository(RemoveRepositoryCommand c) {
writeRateLimiters.remove(rateLimiterKey(c.projectName(), c.repositoryName()));
if (writeQuotaEnabled()) {
writeRateLimiters.remove(rateLimiterKey(c.projectName(), c.repositoryName()));
}
return CompletableFuture.supplyAsync(() -> {
projectManager.get(c.projectName()).repos().remove(c.repositoryName());
return null;
Expand All @@ -248,7 +251,7 @@ private CompletableFuture<Void> purgeRepository(PurgeRepositoryCommand c) {

private CompletableFuture<Revision> push(PushCommand c) {
if (c.projectName().equals(INTERNAL_PROJ) || c.repositoryName().equals(Project.REPO_DOGMA) ||
Double.compare(permitsPerSecond, -1) == 0) {
!writeQuotaEnabled()) {
return push0(c);
}

Expand Down Expand Up @@ -305,6 +308,10 @@ private static String rateLimiterKey(String projectName, String repoName) {
return projectName + '/' + repoName;
}

private boolean writeQuotaEnabled() {
return Double.compare(permitsPerSecond, -1) > 0;
}

private Repository repo(RepositoryCommand<?> c) {
return projectManager.get(c.projectName()).repos().get(c.repositoryName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.command.PushCommand;
import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand;
import com.linecorp.centraldogma.server.metadata.MetadataService;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
import com.linecorp.centraldogma.server.storage.project.Project;
Expand Down Expand Up @@ -729,6 +730,9 @@ private synchronized void replayLogs(long targetRevision) {
": " + actualResult + " (expected: " + expectedResult +
", command: " + command + ')');
}
if (command instanceof RemoveRepositoryCommand) {
clearWriteQuota((RemoveRepositoryCommand) command);
}
} else {
// same replicaId. skip
}
Expand Down Expand Up @@ -779,6 +783,8 @@ private SafeCloseable safeLock(Command<?> command) {
mtx.acquire();
if (command instanceof PushCommand) {
writeLock = acquireWriteLock((PushCommand) command);
} else if (command instanceof RemoveRepositoryCommand) {
clearWriteQuota((RemoveRepositoryCommand) command);
}
} catch (Exception e) {
logger.error("Failed to acquire a lock for {}; entering read-only mode", executionPath, e);
Expand All @@ -793,6 +799,12 @@ private SafeCloseable safeLock(Command<?> command) {
return () -> safeRelease(mtx);
}

private void clearWriteQuota(RemoveRepositoryCommand command) {
final String cacheKey = rateLimiterKey(command.projectName(), command.repositoryName());
semaphoreMap.remove(cacheKey);
writeQuotaCache.invalidate(cacheKey);
}

private static void safeRelease(InterProcessMutex mtx) {
try {
mtx.release();
Expand Down