Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quota limits for push command #545

Merged
merged 11 commits into from
Dec 10, 2020
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.common;

import static java.util.Objects.requireNonNull;

import javax.annotation.Nullable;

/**
* A {@link CentralDogmaException} that is raised when a client is attempting to send requests more than
* quota limits.
*/
public class TooManyRequestsException extends CentralDogmaException {

private static final long serialVersionUID = 1712601138432866984L;

@Nullable
private String type;

/**
* Creates a new instance.
*/
public TooManyRequestsException() {}

/**
* Creates a new instance.
*/
public TooManyRequestsException(String type, String path, double permitsPerSecond) {
this('\'' + path + "' (quota limit: " + permitsPerSecond + "/sec)");
this.type = requireNonNull(type, "type");
}

/**
* Creates a new instance.
*/
public TooManyRequestsException(String message) {
super(message);
}

/**
* Creates a new instance.
*/
public TooManyRequestsException(Throwable cause) {
super(cause);
}

/**
* Creates a new instance.
*/
public TooManyRequestsException(String message, Throwable cause) {
super(message, cause);
}

/**
* Creates a new instance.
*
* @param message the detail message
* @param writableStackTrace whether or not the stack trace should be writable
*/
public TooManyRequestsException(String message, boolean writableStackTrace) {
super(message, writableStackTrace);
}

/**
* Creates a new instance.
*/
protected TooManyRequestsException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

/**
* Returns the {@code type} specified when creating this {@link Exception}.
*/
@Nullable
public String type() {
return type;
}
}

3 changes: 3 additions & 0 deletions it/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ dependencies {
// JSch
testImplementation 'com.jcraft:jsch'

testImplementation 'org.apache.curator:curator-test'

testImplementation project(':testing-internal')
testImplementation project(':server-auth:shiro')
}
5 changes: 3 additions & 2 deletions it/src/test/java/com/linecorp/centraldogma/it/CacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ void getFile(ClientType clientType, TestInfo testInfo) {
Change.ofTextUpsert("/foo.txt", "bar")).join();

final Map<String, Double> meters2 = metersSupplier.get();
// Metadata needs to access to check a write quota (one cache miss).
if (clientType == ClientType.LEGACY) {
// NB: A push operation involves a history() operation to retrieve the last commit.
// Therefore we should observe one cache miss. (Thrift only)
assertThat(missCount(meters2)).isEqualTo(missCount(meters1) + 1);
assertThat(missCount(meters2)).isEqualTo(missCount(meters1) + 2);
} else {
assertThat(missCount(meters2)).isEqualTo(missCount(meters1));
assertThat(missCount(meters2)).isEqualTo(missCount(meters1) + 1);
}

// First getFile() should miss.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.curator.test.InstanceSpec;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.auth.OAuth2Token;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.api.v1.AccessToken;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.server.GracefulShutdownTimeout;
import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig;
import com.linecorp.centraldogma.server.ZooKeeperServerConfig;
import com.linecorp.centraldogma.server.auth.AuthProviderFactory;
import com.linecorp.centraldogma.testing.internal.TemporaryFolderExtension;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory;

class ReplicationWriteQuotaTest extends WriteQuotaTestBase {

private static final AuthProviderFactory factory = new TestAuthProviderFactory();

@RegisterExtension
static final TemporaryFolderExtension tempDir = new TemporaryFolderExtension();

private WebClient webClient;
private CentralDogma dogmaClient;

@Override
protected WebClient webClient() {
return webClient;
}

@Override
protected CentralDogma dogmaClient() {
return dogmaClient;
}

@BeforeEach
void setUp() throws IOException {
final int port1 = InstanceSpec.getRandomPort();
final int port2 = InstanceSpec.getRandomPort();
final int port3 = InstanceSpec.getRandomPort();

final Map<Integer, ZooKeeperServerConfig> servers = randomServerConfigs(3);

final CompletableFuture<Void> r1 = startNewReplicaWithRetries(port1, 1, servers);
final CompletableFuture<Void> r2 = startNewReplicaWithRetries(port2, 2, servers);
final CompletableFuture<Void> r3 = startNewReplicaWithRetries(port3, 3, servers);
r1.join();
r2.join();
r3.join();

final String adminSessionId =
getSessionId(port1, TestAuthMessageUtil.USERNAME, TestAuthMessageUtil.PASSWORD);

webClient = WebClient.builder("http://127.0.0.1:" + port1)
.auth(OAuth2Token.of(adminSessionId))
.build();
dogmaClient = new ArmeriaCentralDogmaBuilder()
.accessToken(adminSessionId)
.host(webClient.uri().getHost(), webClient.uri().getPort())
.build();
}

private static Map<Integer, ZooKeeperServerConfig> randomServerConfigs(int numReplicas) {
final ImmutableMap.Builder<Integer, ZooKeeperServerConfig> builder =
ImmutableMap.builderWithExpectedSize(numReplicas);
for (int i = 0; i < numReplicas; i++) {
final int zkQuorumPort = InstanceSpec.getRandomPort();
final int zkElectionPort = InstanceSpec.getRandomPort();
final int zkClientPort = InstanceSpec.getRandomPort();

builder.put(i + 1, new ZooKeeperServerConfig("127.0.0.1", zkQuorumPort, zkElectionPort,
zkClientPort, /* groupId */ null, /* weight */ 1));
}
return builder.build();
}

private static CompletableFuture<Void> startNewReplicaWithRetries(
int port, int serverId, Map<Integer, ZooKeeperServerConfig> servers) throws IOException {
final AtomicReference<CompletableFuture<Void>> futureRef = new AtomicReference<>();
await().pollInSameThread().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThatCode(() -> {
futureRef.set(startNewReplica(port, serverId, servers));
}).doesNotThrowAnyException();
});
return futureRef.get();
}

private static CompletableFuture<Void> startNewReplica(
int port, int serverId, Map<Integer, ZooKeeperServerConfig> servers) throws IOException {
return new CentralDogmaBuilder(tempDir.newFolder().toFile())
.port(port, SessionProtocol.HTTP)
.administrators(TestAuthMessageUtil.USERNAME)
.authProviderFactory(factory)
.mirroringEnabled(false)
.writeQuotaPerRepository(5, 1)
.gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0))
.replication(new ZooKeeperReplicationConfig(serverId, servers))
.build().start();
}

private static String getSessionId(int port1, String username, String password)
throws JsonProcessingException {
final WebClient client = WebClient.of("http://127.0.0.1:" + port1);
final AggregatedHttpResponse response =
TestAuthMessageUtil.login(client, username, password);

assertThat(response.status()).isEqualTo(HttpStatus.OK);
return Jackson.readValue(response.content().array(), AccessToken.class).accessToken();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it;

import static org.assertj.core.api.Assertions.assertThat;

import java.net.URI;
import java.net.UnknownHostException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.auth.OAuth2Token;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.api.v1.AccessToken;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory;
import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension;

class StandaloneWriteQuotaTest extends WriteQuotaTestBase {

@RegisterExtension
static final CentralDogmaExtension dogma = new CentralDogmaExtension() {
@Override
protected void configure(CentralDogmaBuilder builder) {
builder.administrators(TestAuthMessageUtil.USERNAME);
builder.authProviderFactory(new TestAuthProviderFactory());
// Default write quota
builder.writeQuotaPerRepository(5, 1);
}
};

private WebClient webClient;
private CentralDogma dogmaClient;

@BeforeEach
void setUp() throws JsonProcessingException, UnknownHostException {
final String adminSessionId = getSessionId(TestAuthMessageUtil.USERNAME, TestAuthMessageUtil.PASSWORD);
final URI uri = dogma.httpClient().uri();

webClient = WebClient.builder(uri)
.auth(OAuth2Token.of(adminSessionId))
.build();
dogmaClient = new ArmeriaCentralDogmaBuilder()
.accessToken(adminSessionId)
.host(uri.getHost(), uri.getPort())
.build();
}

@Override
protected WebClient webClient() {
return webClient;
}

@Override
protected CentralDogma dogmaClient() {
return dogmaClient;
}

private static String getSessionId(String username, String password) throws JsonProcessingException {
final AggregatedHttpResponse response =
TestAuthMessageUtil.login(dogma.httpClient(), username, password);

assertThat(response.status()).isEqualTo(HttpStatus.OK);
return Jackson.readValue(response.content().array(), AccessToken.class).accessToken();
}
}
Loading