Skip to content

Commit

Permalink
Provide a way to listen to the events of mirroring (#1057)
Browse files Browse the repository at this point in the history
Motivation:

Mirroring failure is only recorded as a warning, but there is a need to
handle it in a different way. For example, it can be recorded in metrics
or end-users can be notified immediately.

`MirrorListener` is provided as an extension point to utilize various
events occurring in the mirror.

Modifications:

- Introduce `MirrorListener` whose implementations can be loaded
dynamically via Java SPI.
- `onStart()`, `onComplete()` and `onError()` events are added.
- The default behavior is preserved in `DefaultMirrorListener` which is
only used when no custom `MirrorListener` is configured.

Result:

You can now use `MirrorListener` to listen to `Mirror` events.
  • Loading branch information
ikhoon authored Nov 12, 2024
1 parent b199e62 commit bed87a2
Show file tree
Hide file tree
Showing 20 changed files with 681 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it.mirror.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.credential.Credential;
import com.linecorp.centraldogma.server.internal.mirror.AbstractMirror;
import com.linecorp.centraldogma.server.internal.mirror.MirrorSchedulingService;
import com.linecorp.centraldogma.server.mirror.Mirror;
import com.linecorp.centraldogma.server.mirror.MirrorDirection;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirrorStatus;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.server.storage.repository.MetaRepository;
import com.linecorp.centraldogma.server.storage.repository.Repository;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

class CustomMirrorListenerTest {

private static final Cron EVERY_SECOND = new CronParser(
CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).parse("* * * * * ?");

@TempDir
static File temporaryFolder;

@BeforeEach
void setUp() {
TestMirrorListener.reset();
}

@AfterEach
void tearDown() {
TestMirrorListener.reset();
}

@Test
void shouldNotifyMirrorEvents() {
final AtomicInteger taskCounter = new AtomicInteger();
final ProjectManager pm = mock(ProjectManager.class);
final Project p = mock(Project.class);
final MetaRepository mr = mock(MetaRepository.class);
final Repository r = mock(Repository.class);
when(pm.list()).thenReturn(ImmutableMap.of("foo", p));
when(p.name()).thenReturn("foo");
when(p.metaRepo()).thenReturn(mr);
when(r.parent()).thenReturn(p);
when(r.name()).thenReturn("bar");

final Mirror mirror = new AbstractMirror("my-mirror-1", true, EVERY_SECOND,
MirrorDirection.REMOTE_TO_LOCAL,
Credential.FALLBACK, r, "/",
URI.create("unused://uri"), "/", "", null) {
@Override
protected MirrorResult mirrorLocalToRemote(File workDir, int maxNumFiles, long maxNumBytes,
Instant triggeredTime) {
throw new UnsupportedOperationException();
}

@Override
protected MirrorResult mirrorRemoteToLocal(File workDir, CommandExecutor executor,
int maxNumFiles, long maxNumBytes, Instant triggeredTime)
throws Exception {
final int counter = taskCounter.incrementAndGet();
if (counter == 1) {
return newMirrorResult(MirrorStatus.SUCCESS, "1", Instant.now());
} else if (counter == 2) {
return newMirrorResult(MirrorStatus.UP_TO_DATE, "2", Instant.now());
} else {
throw new IllegalStateException("failed");
}
}
};

when(mr.mirrors()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mirror)));

final MirrorSchedulingService service = new MirrorSchedulingService(
temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1);
final CommandExecutor executor = mock(CommandExecutor.class);
service.start(executor);

try {
await().until(() -> taskCounter.get() >= 3);
} finally {
service.stop();
}
final Integer startCount = TestMirrorListener.startCount.get(mirror);
assertThat(startCount).isGreaterThanOrEqualTo(3);

final List<MirrorResult> completions = TestMirrorListener.completions.get(mirror);
assertThat(completions).hasSize(2);
assertThat(completions.get(0).mirrorStatus()).isEqualTo(MirrorStatus.SUCCESS);
assertThat(completions.get(1).mirrorStatus()).isEqualTo(MirrorStatus.UP_TO_DATE);

final List<Throwable> errors = TestMirrorListener.errors.get(mirror);
assertThat(errors).hasSizeGreaterThanOrEqualTo(1);
assertThat(errors.get(0).getCause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("failed");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it.mirror.listener;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.linecorp.centraldogma.server.mirror.Mirror;
import com.linecorp.centraldogma.server.mirror.MirrorListener;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirrorTask;

public final class TestMirrorListener implements MirrorListener {

static final Map<Mirror, Integer> startCount = new ConcurrentHashMap<>();
static final Map<Mirror, List<MirrorResult>> completions = new ConcurrentHashMap<>();
static final Map<Mirror, List<Throwable>> errors = new ConcurrentHashMap<>();

static void reset() {
startCount.clear();
completions.clear();
errors.clear();
}

@Override
public void onStart(MirrorTask mirror) {
startCount.merge(mirror.mirror(), 1, Integer::sum);
}

@Override
public void onComplete(MirrorTask mirror, MirrorResult result) {
final List<MirrorResult> results = new ArrayList<>();
results.add(result);
completions.merge(mirror.mirror(), results, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}

@Override
public void onError(MirrorTask mirror, Throwable cause) {
final List<Throwable> exceptions = new ArrayList<>();
exceptions.add(cause);
errors.merge(mirror.mirror(), exceptions, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.linecorp.centraldogma.it.mirror.listener.TestMirrorListener
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.util.List;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -90,6 +91,7 @@ void setUp() throws Exception {
.auth(AuthToken.ofOAuth2(adminToken))
.build()
.blocking();
TestMirrorRunnerListener.reset();
}

@Test
Expand Down Expand Up @@ -134,6 +136,14 @@ void triggerMirroring() throws Exception {
.contains("Repository 'foo/bar' already at");
}
}

final String listenerKey = FOO_PROJ + '/' + TEST_MIRROR_ID + '/' + USERNAME;
assertThat(TestMirrorRunnerListener.startCount.get(listenerKey)).isEqualTo(3);
final List<MirrorResult> results = TestMirrorRunnerListener.completions.get(listenerKey);
final MirrorResult firstResult = results.get(0);
assertThat(firstResult.mirrorStatus()).isEqualTo(MirrorStatus.SUCCESS);
assertThat(results.get(1).mirrorStatus()).isEqualTo(MirrorStatus.UP_TO_DATE);
assertThat(results.get(2).mirrorStatus()).isEqualTo(MirrorStatus.UP_TO_DATE);
}

private static MirrorDto newMirror() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it.mirror.git;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.linecorp.centraldogma.server.mirror.MirrorListener;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirrorTask;

public class TestMirrorRunnerListener implements MirrorListener {

static final Map<String, Integer> startCount = new ConcurrentHashMap<>();
static final Map<String, List<MirrorResult>> completions = new ConcurrentHashMap<>();
static final Map<String, List<Throwable>> errors = new ConcurrentHashMap<>();

static void reset() {
startCount.clear();
completions.clear();
errors.clear();
}

private static String key(MirrorTask task) {
return task.project().name() + '/' + task.mirror().id() + '/' + task.triggeredBy().login();
}

@Override
public void onStart(MirrorTask mirror) {
startCount.merge(key(mirror), 1, Integer::sum);
}

@Override
public void onComplete(MirrorTask mirror, MirrorResult result) {
final List<MirrorResult> results = new ArrayList<>();
results.add(result);
completions.merge(key(mirror), results, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}

@Override
public void onError(MirrorTask mirror, Throwable cause) {
final List<Throwable> exceptions = new ArrayList<>();
exceptions.add(cause);
errors.merge(key(mirror), exceptions, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.linecorp.centraldogma.it.mirror.git.TestMirrorRunnerListener
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.junit.jupiter.api.Test;

import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.centraldogma.server.metadata.User;
import com.linecorp.centraldogma.server.mirror.Mirror;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirrorStatus;
import com.linecorp.centraldogma.server.mirror.MirrorTask;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand All @@ -48,9 +50,11 @@ void testSuccessMetrics() {
final MeterRegistry meterRegistry = new SimpleMeterRegistry();
Mirror mirror = newMirror("git://a.com/b.git", DefaultGitMirror.class, "foo", "bar");
mirror = spy(mirror);
doReturn(new MirrorResult(mirror.id(), "foo", "bar", MirrorStatus.SUCCESS, "", Instant.now()))
.when(mirror).mirror(any(), any(), anyInt(), anyLong());
new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L);
doReturn(new MirrorResult(mirror.id(), "foo", "bar", MirrorStatus.SUCCESS, "", Instant.now(),
Instant.now()))
.when(mirror).mirror(any(), any(), anyInt(), anyLong(), any());
final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true);
new InstrumentedMirroringJob(mirrorTask, meterRegistry).run(null, null, 0, 0L);
assertThat(MoreMeters.measureAll(meterRegistry))
.contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," +
"localRepo=bar,project=foo,remoteBranch=,remotePath=/,success=true}", 1.0));
Expand All @@ -62,8 +66,9 @@ void testFailureMetrics() {
Mirror mirror = newMirror("git://a.com/b.git#main", DefaultGitMirror.class, "foo", "bar");
mirror = spy(mirror);
final RuntimeException e = new RuntimeException();
doThrow(e).when(mirror).mirror(any(), any(), anyInt(), anyLong());
final MirroringTask task = new MirroringTask(mirror, "foo", meterRegistry);
doThrow(e).when(mirror).mirror(any(), any(), anyInt(), anyLong(), any());
final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true);
final InstrumentedMirroringJob task = new InstrumentedMirroringJob(mirrorTask, meterRegistry);
assertThatThrownBy(() -> task.run(null, null, 0, 0L))
.isSameAs(e);
assertThat(MoreMeters.measureAll(meterRegistry))
Expand All @@ -80,8 +85,9 @@ void testTimerMetrics() {
doAnswer(invocation -> {
Thread.sleep(1000);
return null;
}).when(mirror).mirror(any(), any(), anyInt(), anyLong());
new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L);
}).when(mirror).mirror(any(), any(), anyInt(), anyLong(), any());
final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true);
new InstrumentedMirroringJob(mirrorTask, meterRegistry).run(null, null, 0, 0L);
assertThat(MoreMeters.measureAll(meterRegistry))
.hasEntrySatisfying(
"mirroring.task#total{direction=LOCAL_TO_REMOTE,localPath=/," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.linecorp.centraldogma.server.internal.api.auth.RequiresWritePermission;
import com.linecorp.centraldogma.server.internal.mirror.MirrorRunner;
import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
import com.linecorp.centraldogma.server.metadata.User;
import com.linecorp.centraldogma.server.mirror.Mirror;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.storage.project.Project;
Expand Down Expand Up @@ -142,9 +143,9 @@ private CompletableFuture<PushResultDto> createOrUpdate(String projectName,
// Mirroring may be a long-running task, so we need to increase the timeout.
@RequestTimeout(value = 5, unit = TimeUnit.MINUTES)
@Post("/projects/{projectName}/mirrors/{mirrorId}/run")
public CompletableFuture<MirrorResult> runMirror(@Param String projectName, @Param String mirrorId)
throws Exception {
return mirrorRunner.run(projectName, mirrorId);
public CompletableFuture<MirrorResult> runMirror(@Param String projectName, @Param String mirrorId,
User user) throws Exception {
return mirrorRunner.run(projectName, mirrorId, user);
}

private static MirrorDto convertToMirrorDto(String projectName, Mirror mirror) {
Expand Down
Loading

0 comments on commit bed87a2

Please sign in to comment.