Skip to content

Commit

Permalink
Pool can have a companion ticker thread (#236)
Browse files Browse the repository at this point in the history
* Pool can have a companion ticker thread

* checkstyle

* checkstyle

* fine grain ticker with duration.lowest

* ticker error handling
  • Loading branch information
matthieun authored and lucaspcram committed Oct 12, 2018
1 parent ec8d9f8 commit a892cb4
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public int hashCode()
return Long.hashCode(this.milliseconds);
}

public Duration highest(final Duration other)
{
if (other == null || this.isMoreThanOrEqualsTo(other))
{
return this;
}
return other;
}

public boolean isCloseTo(final Duration that, final Duration safe)
{
return difference(that).isLessThanOrEqualsTo(safe);
Expand All @@ -126,6 +135,15 @@ public boolean isMoreThanOrEqualsTo(final Duration that)
return this.milliseconds >= that.milliseconds;
}

public Duration lowest(final Duration other)
{
if (other == null || this.isLessThanOrEqualsTo(other))
{
return this;
}
return other;
}

public long millisecondsOfSecond()
{
return this.asMilliseconds() % MILLISECONDS_PER_SECOND;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.openstreetmap.atlas.utilities.threads;

import org.openstreetmap.atlas.utilities.scalars.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author matthieun
*/
public class LogTicker extends Ticker
{
private static final Logger logger = LoggerFactory.getLogger(LogTicker.class);

public LogTicker(final String name, final Duration tickerTime)
{
super(name, tickerTime);
}

@Override
protected void tickAction(final Duration sinceStart)
{
logger.info("{}: {}", getName(), sinceStart);
}
}
34 changes: 34 additions & 0 deletions src/main/java/org/openstreetmap/atlas/utilities/threads/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,45 @@ public <T, V> V queue(final Callable<T> task, final Function<T, V> doWithTheOutp
return doWithTheOutput.apply(item);
}

public <T> Result<T> queue(final Callable<T> task, final Ticker ticker)
{
final Callable<T> taskWrapper = () ->
{
try
{
return task.call();
}
finally
{
ticker.close();
}
};
this.queue(ticker);
return new Result<>(this.pool.submit(taskWrapper), this, ticker);
}

public void queue(final Runnable command)
{
this.pool.execute(command);
}

public void queue(final Runnable command, final Ticker ticker)
{
final Runnable commandWrapper = () ->
{
try
{
command.run();
}
finally
{
ticker.close();
}
};
this.pool.execute(ticker);
this.pool.execute(commandWrapper);
}

public <T> List<Result<T>> queueAll(final Iterable<Callable<T>> tasks)
{
try
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.openstreetmap.atlas.utilities.threads;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -19,15 +20,25 @@ public class Result<T>
{
private final Future<T> future;
private final Pool pool;
private final Optional<Ticker> ticker;

public Result(final Future<T> future, final Pool pool)
{
this.pool = pool;
this.future = future;
this.ticker = Optional.empty();
}

public Result(final Future<T> future, final Pool pool, final Ticker ticker)
{
this.pool = pool;
this.future = future;
this.ticker = Optional.of(ticker);
}

public boolean cancel(final boolean mayInterruptIfRunning)
{
this.ticker.ifPresent(Ticker::close);
return this.future.cancel(mayInterruptIfRunning);
}

Expand All @@ -41,6 +52,10 @@ public T get()
{
throw new CoreException("Could not get value from Future in {}", this.pool, e);
}
finally
{
this.ticker.ifPresent(Ticker::close);
}
}

public T get(final Duration timeout) throws TimeoutException
Expand All @@ -55,6 +70,10 @@ public T get(final Duration timeout) throws TimeoutException
"Interrupted before {} elapsed. Could not get value from Future in {}", timeout,
this.pool, e);
}
finally
{
this.ticker.ifPresent(Ticker::close);
}
}

public boolean isCancelled()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.openstreetmap.atlas.utilities.threads;

import java.io.Closeable;

import org.openstreetmap.atlas.utilities.scalars.Duration;
import org.openstreetmap.atlas.utilities.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Ticker companion.
*
* @author matthieun
*/
public abstract class Ticker implements Runnable, Closeable
{
private static final Logger logger = LoggerFactory.getLogger(Ticker.class);
private static final Duration CHECK_TIME = Duration.milliseconds(500);

private final String name;
private final Duration tickerTime;

// This tells the ticker it is time to stop ticking.
private volatile boolean stop;

/**
* @param name
* The name of the ticker companion
* @param tickerTime
* The duration between each tick. It is indicative only.
*/
public Ticker(final String name, final Duration tickerTime)
{
this.name = name;
this.tickerTime = tickerTime;
this.stop = false;
}

@Override
public void close()
{
this.stop = true;
}

public String getName()
{
return this.name;
}

@Override
public void run()
{
final Time start = Time.now();
Time lastCheck = Time.now();
while (!this.stop)
{
// Sleep small, to check regularly. If the thread is "closed" it will then die fairly
// soon even if the ticker time is really long.
CHECK_TIME.lowest(this.tickerTime).sleep();
if (lastCheck.elapsedSince().isMoreThan(this.tickerTime))
{
try
{
tickAction(start.elapsedSince());
}
catch (final Exception e)
{
// In case of any error in tickAction, kill the ticker silently, with a nice
// error message.
logger.error("{} tick action failed! Associated job should not be affected.",
getName(), e);
}
lastCheck = Time.now();
}
}
}

@Override
public String toString()
{
return "Ticker [name=" + this.name + ", tickerTime=" + this.tickerTime + "]";
}

/**
* Act upon a tick event.
*
* @param sinceStart
* The duration elapsed since the start of the ticker.
*/
protected abstract void tickAction(Duration sinceStart);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,17 @@ public void testEquals()
Assert.assertTrue(Duration.seconds(3600).equals(Duration.hours(1)));
Assert.assertFalse(Duration.seconds(1.001).equals(Duration.seconds(1.002)));
}

@Test
public void testLowestAndHighest()
{
final Duration lowest = Duration.seconds(5);
final Duration highest = Duration.seconds(10);
Assert.assertEquals(lowest, lowest.lowest(highest));
Assert.assertEquals(lowest, highest.lowest(lowest));
Assert.assertEquals(highest, highest.lowest(null));
Assert.assertEquals(highest, lowest.highest(highest));
Assert.assertEquals(highest, highest.highest(lowest));
Assert.assertEquals(lowest, lowest.highest(null));
}
}
Loading

0 comments on commit a892cb4

Please sign in to comment.