Skip to content

Commit

Permalink
验证Limited自定义线程,未验证成功
Browse files Browse the repository at this point in the history
  • Loading branch information
WG-chenchuangkun committed Aug 25, 2023
1 parent 682771d commit 3173d39
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ protected CustomForkJoinWorkerThread(ForkJoinPool pool, String threadName) {

/**
* 线程终止时,执行的清理动作
* 问:为什么自定义线程的onTermination()方法,不会在线程回收时被调用?
* 答:onTermination() 方法仅在线程执行任务时抛出未捕获异常的情况下被调用,它并不是在线程被回收时执行的方法。
* 【场景1:不结合ManagedBlocker的情况下】
* 已验证:在线程执行完任务,且没有窃取到其他任务时,会执行 onTermination()
* 具体见:CustomForkJoinWorkerThreadFactoryTest
*/
@Override
protected void onTermination(Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CustomForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinW
/**
* 线程编号
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(0);

public CustomForkJoinWorkerThreadFactory() {
this.threadNamePrefix = PoolConsts.DEFAULT_THREAD_NAME_PREFIX;
Expand All @@ -46,12 +46,12 @@ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
int threadNum = threadNumber.incrementAndGet();
String newThreadName = getNewThreadName(threadNum);
if (logger.isDebugEnabled()) {
logger.debug("create thread, threadNum={}, newThreadName={}, pool={}", threadNumber.get(), newThreadName, pool.toString());
logger.debug("create thread, newThreadName={}, pool={}", newThreadName, pool.toString());
}

// 当线程编号大于等于最大线程编号时,将线程编号重置
if (threadNum >= PoolConsts.MAX_THREAD_NUMBER) {
threadNumber.compareAndSet(threadNum, 1);
threadNumber.compareAndSet(threadNum, 0);
}

// 使用自定义线程名称
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ protected LimitedThreadForkJoinWorkerThread(ForkJoinPool pool, String threadName

/**
* 线程终止时,执行的清理动作
* 问:为什么自定义线程的onTermination()方法,不会在线程回收时被调用?
* 答:onTermination() 方法仅在线程执行任务时抛出未捕获异常的情况下被调用,它并不是在线程被回收时执行的方法。
* 【场景1:不结合ManagedBlocker的情况下】
* 结论:在线程执行完任务,且没有窃取到其他任务时,会执行 onTermination()
* 案例:CustomForkJoinWorkerThreadFactoryTest
*
* 【场景2:结合ManagedBlocker的情况下】
* 案例:ManagedBlockerTest
* 分析:
* 1、当创建的线程序号达到一定数量时(如:20个),任务执行完后,线程一直处于WAIT状态,这就导致无可用线程,且一直不会执行到onTermination()方法,也就不会执行threadCount-1
* 2、同时由于threadCount=maxThreads,导致不会创建新线程,最终出现业务逻辑不被执行的情况。
*/
@Override
protected void onTermination(Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* 1、支持自定义线程名字
* 2、适用于面对IO阻塞型任务时,通过扩展线程池中的线程数,来提高执行效率的场景,配合ManagedBlocker使用
* 注意:需通过LimitedThreadForkJoinWorkerThreadFactory,限制ForkJoinPool中创建的最大线程数,避免无限制的创建线程,导致OOM
*
* <p>
* TODO 暂未找到方法对ForkJoinPool的线程回收进行精确控制,因此废弃该类
*
* @author chenck
Expand All @@ -37,12 +37,12 @@ public class LimitedThreadForkJoinWorkerThreadFactory implements ForkJoinPool.Fo
/**
* 线程编号,用于给线程命名
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(0);

/**
* 线程数量,用于控制创建线程的数量
*/
private final AtomicInteger threadCount = new AtomicInteger(1);
private final AtomicInteger threadCount = new AtomicInteger(0);

public LimitedThreadForkJoinWorkerThreadFactory(int maxThreads) {
this.maxThreads = maxThreads;
Expand Down Expand Up @@ -82,13 +82,13 @@ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
int threadNum = threadNumber.incrementAndGet();
String newThreadName = getNewThreadName(threadNum);
if (logger.isDebugEnabled()) {
logger.debug("create thread, threadNum={}, threadCount={}, maxThreads={}, newThreadName={}, pool={}", threadNumber.get(), threadCount.get(), maxThreads, newThreadName, pool.toString());
logger.debug("create thread, threadCount={}, maxThreads={}, newThreadName={}, pool={}", threadCount.get(), maxThreads, newThreadName, pool.toString());
}
System.out.println("create thread, threadCount=" + threadCount + ", maxThreads=" + maxThreads + " , pool=" + pool.toString() + "newThreadName=" + newThreadName);
System.out.println("create thread, threadCount=" + threadCount + ", maxThreads=" + maxThreads + ", newThreadName=" + newThreadName + " , pool=" + pool.toString());

// 当线程编号大于等于最大线程编号时,将线程编号重置
if (threadNum >= PoolConsts.MAX_THREAD_NUMBER) {
threadNumber.compareAndSet(threadNum, 1);
threadNumber.compareAndSet(threadNum, 0);
}

// 使用自定义线程名称
Expand All @@ -98,9 +98,9 @@ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 如果当前线程数量超过最大线程数,则不创建新线程,并将threadCount-1
threadCount.decrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("Exceeded maximum number of threads, threadNum={}, threadCount={}, maxThreads={}, pool={}", threadNumber.get(), threadCount.get(), maxThreads, pool.toString());
logger.debug("Exceeded maximum number of threads, threadCount={}, maxThreads={}, threadNum={}, pool={}", threadCount.get(), maxThreads, threadNumber.get(), pool.toString());
}
System.out.println("Exceeded maximum number of threads, threadNum=" + threadNumber.get() + ", threadCount=" + threadCount.get() + ", maxThreads=" + maxThreads + " , pool=" + pool.toString());
System.out.println("Exceeded maximum number of threads, threadName=" + Thread.currentThread().getName() + ", threadCount=" + threadCount.get() + ", maxThreads=" + maxThreads + ", threadNum=" + threadNumber.get() + " , pool=" + pool.toString());
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,32 @@ public MyManagedBlocker(Object key, Function function) {
this.function = function;
}


@Override
public boolean block() throws InterruptedException {
// 当阻塞条件满足时,返回 true,否则返回 false
done = true;
if (null == result) {
// 执行阻塞操作,这可能是一个 IO 操作、等待锁等
result = function.apply(key);
}

// 执行阻塞操作,这可能是一个 IO 操作、等待锁等
result = function.apply(key);

// 返回 true 表示阻塞成功,返回 false 表示阻塞被中断
return false;
// true 表示完成执行阻塞操作(正常结束或被中断),需要释放阻塞
done = true;
return true;
}

/**
* 判断是否可以释放阻塞
* <p>
* 建议:方法 isReleasable()在 ForkJoinPool.managedBlock 中被调用,需要特别注意返回true和false,返回值不同造成的结果不一样,需要结合源码进行了解。
* <p>
* 返回 true,表示释放阻塞,如果第一次执行isReleasable()就返回true,那么block()方法不会被执行,也就是说不会执行具体的业务逻辑。
* 返回 false,表示继续阻塞,会通过 ForkJoinPool.tryCompensate() 尝试释放或创建一个补偿线程来处理阻塞。
* <p>
*
* @return true,表示释放阻塞,false,表示继续阻塞
*/
@Override
public boolean isReleasable() {
// 判断是否可以释放阻塞
// 返回 true,表示可以释放阻塞,返回 false,表示继续阻塞
return done;
return done || result != null;
}

public Object getResult() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.github.jesse.l2cache.test;

import com.github.jesse.l2cache.util.pool.CustomForkJoinWorkerThreadFactory;
import com.github.jesse.l2cache.util.pool.LimitedThreadForkJoinWorkerThreadFactory;
import org.junit.Test;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
Expand All @@ -14,44 +16,71 @@
* @date 2023/8/23 23:55
*/
public class CustomForkJoinWorkerThreadFactoryTest {
// 线程池并行度(线程数)
public static final int parallelism = 5;
public static final int maxThreads = 10;
public static final String threadNamePrefix = "test";

public static void main(String[] args) throws InterruptedException {
// 线程池并行度(线程数)
int parallelism = 5;
/**
* 验证 CustomForkJoinWorkerThreadFactory
* 结果:
* 1、通过Custom自定义线程工程中创建线程,线程池中最大线程数为5,等同于使用默认的DefaultForkJoinWorkerThreadFactory)
* 2、在线程执行完任务,且没有窃取到其他任务时,会执行 onTermination()
*/
@Test
public void testCustomForkJoinWorkerThreadFactory() throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool(parallelism, new CustomForkJoinWorkerThreadFactory(threadNamePrefix), null, false);
loopTest(pool);
}

/**
* 验证 LimitedThreadForkJoinWorkerThreadFactory
* 结果:
* 1、通过Limited自定义线程工程中创建线程,线程池中最大线程数为5,线程数不会达到maxThreads(等同于使用默认的DefaultForkJoinWorkerThreadFactory)
* 2、在线程执行完任务,且没有窃取到其他任务时,会执行 onTermination()
*/
@Test
public void testLimitedThreadForkJoinWorkerThreadFactory() throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool(parallelism, new LimitedThreadForkJoinWorkerThreadFactory(maxThreads, threadNamePrefix), null, false);
loopTest(pool);
}

// 验证结果:ForkJoinPool自定义线程名称后,最大线程数为5(等同于使用默认的DefaultForkJoinWorkerThreadFactory)
ForkJoinPool pool = new ForkJoinPool(parallelism, new CustomForkJoinWorkerThreadFactory("test"), null, false);
test1(pool, 0);
public void loopTest(ForkJoinPool pool) throws InterruptedException {
// 模拟执行业务逻辑
submitTaskToPool(pool, 0);

int execNum = 0;
while (true) {
execNum++;
Thread.sleep(1000);
System.out.println("execNum=" + execNum + " , " + threadDateTimeInfo() + ", pool=" + pool.toString());
System.out.println("execNum=" + execNum + " , " + threadDateTimeInfo() + ", pool=" + pool);

if (execNum % 30 == 0) {
System.out.println("execNum=" + execNum);
test1(pool, execNum);
// 模拟执行业务逻辑
submitTaskToPool(pool, execNum);
Thread.sleep(30000);
}
}
}

static void test1(ForkJoinPool pool, int execNum) {

/**
* 模拟提交任务到ForkJoinPool
*/
public void submitTaskToPool(ForkJoinPool pool, int execNum) {
String key = "key_" + execNum + "_";
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 50; i++) {
int finalI = i;
// 构建任务并提交到线程池
pool.execute(new RecursiveTask<Object>() {
@Override
protected Object compute() {
try {
String result = key + "" + finalI;
System.out.println(threadDateTimeInfo() + ", 休眠2s, result=" + result);
Thread.sleep(2000);// 模拟IO阻塞任务
//System.out.println(threadDateTimeInfo() + ", 休眠1s, result=" + result);
Thread.sleep(1000);// 模拟IO阻塞任务

System.out.println(threadDateTimeInfo() + ", 休眠2s, result=" + result + ", RunningThreadCount=" + pool.getRunningThreadCount() + ", ActiveThreadCount=" + pool.getActiveThreadCount() + ", PoolSize=" + pool.getPoolSize());
System.out.println(threadDateTimeInfo() + ", 休眠1s, result=" + result + ", pool=" + pool);
setRawResult(result);
return getRawResult();
} catch (InterruptedException e) {
Expand All @@ -62,7 +91,7 @@ protected Object compute() {
}
}

static String threadDateTimeInfo() {
public String threadDateTimeInfo() {
return DateTimeFormatter.ISO_TIME.format(LocalTime.now()) + " " + Thread.currentThread().getName() + " ";
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.github.jesse.l2cache.test;

import com.github.jesse.l2cache.util.pool.CustomForkJoinWorkerThreadFactory;
import com.github.jesse.l2cache.util.pool.LimitedThreadForkJoinWorkerThreadFactory;
import com.github.jesse.l2cache.util.pool.MyManagedBlocker;
import com.sun.xml.internal.bind.v2.TODO;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
Expand All @@ -21,17 +23,28 @@
* @date 2023/5/5 18:44
*/
public class ManagedBlockerTest {
// 线程池并行度(线程数)
public static final int parallelism = 5;
public static final int maxThreads = 10;
public static final String threadNamePrefix = "test";

public static void main(String[] args) throws InterruptedException {
// 线程池并行度(线程数)
int parallelism = 5;
// 最大线程数
int maxThreads = 10;
// 创建自定义的ForkJoinPool
// 在管理阻塞时,通过自定义ForkJoinWorkerThreadFactory来限制最大可创建的线程数,避免无限制的创建线程
// 适用于面对IO阻塞型任务时,通过扩展线程池中的线程数,来提高执行效率的场景
ForkJoinPool pool = new ForkJoinPool(parallelism, new LimitedThreadForkJoinWorkerThreadFactory(maxThreads), null, false);
test(pool, 0);
// TODO 验证未通过:使用ManagedBlocker后,可以扩展的备用线程,但运行一段时间后,线程会出现阻塞的情况,导致无线程可用,所以未验证通过
ForkJoinPool pool = new ForkJoinPool(parallelism, new LimitedThreadForkJoinWorkerThreadFactory(maxThreads, threadNamePrefix), null, false);

// 不限制最大可创建的线程数,可能会OOM
// ForkJoinPool pool = new ForkJoinPool(parallelism, new CustomForkJoinWorkerThreadFactory(threadNamePrefix), null, false);

loopTest(pool);
}


static void loopTest(ForkJoinPool pool) throws InterruptedException {
// 模拟执行业务逻辑
submitTaskToPool(pool, 0);

int execNum = 0;
while (true) {
Expand All @@ -41,13 +54,16 @@ public static void main(String[] args) throws InterruptedException {

if (execNum % 30 == 0) {
System.out.println("execNum=" + execNum);
test(pool, execNum);
Thread.sleep(30000);
submitTaskToPool(pool, execNum);
Thread.sleep(10000);
}
}
}

static void test(ForkJoinPool pool, int execNum) {
/**
* 模拟提交任务到ForkJoinPool
*/
static void submitTaskToPool(ForkJoinPool pool, int execNum) {

String key = "key_" + execNum + "_";
for (int i = 0; i < 50; i++) {
Expand All @@ -57,23 +73,24 @@ static void test(ForkJoinPool pool, int execNum) {
@Override
protected Object compute() {
try {
MyManagedBlocker myManagedBlocker = new MyManagedBlocker(key + "" + finalI, key -> {
System.out.println(threadDateTimeInfo() + ", 休眠2s, result=" + key);
MyManagedBlocker blocker = new MyManagedBlocker(key + "" + finalI, key -> {
System.out.println(threadDateTimeInfo() + ", 休眠1s, result=" + key);
try {
Thread.sleep(2000);// 模拟IO阻塞任务
Thread.sleep(1000);// 模拟IO阻塞任务
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return key;
});


// MyManagedBlocker 让线程池知道当前任务即将阻塞,因此需要创建新的补偿工作线程来执行新的提交任务
// 运行指定的阻塞任务。当ForkJoinTask 在 ForkJoinPool 中运行时,此方法可能会在必要时创建备用线程,以确保当前线程在 ManagedBlockerblock.block() 中阻塞时有足够的并行性。
ForkJoinPool.managedBlock(myManagedBlocker);
ForkJoinPool.managedBlock(blocker);

System.out.println(threadDateTimeInfo() + ", 休眠2s, result=" + myManagedBlocker.getResult() + ", pool=" + pool);
setRawResult(myManagedBlocker.getResult());
System.out.println(threadDateTimeInfo() + ", 休眠1s, result=" + blocker.getResult() + ", pool=" + pool);
setRawResult(blocker.getResult());
return getRawResult();

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
public class Test4 {

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
Map<String, String> l1KeyMap = new HashMap<>();
l1KeyMap.put("key1", "key1");
l1KeyMap.put("key2", "key2");
Expand All @@ -27,5 +27,18 @@ public static void main(String[] args) {
// .collect(Collectors.toMap(entry -> l1KeyMap.get(entry.getKey()), entry -> entry.getValue()));
.collect(HashMap::new, (map, entry) -> map.put(l1KeyMap.get(entry.getKey()), entry.getValue()), HashMap::putAll);
System.out.println(l2HitMapTemp);

int count = 0;
boolean relesable = false;
while (!relesable) {
count++;
Thread.sleep(500);
System.out.println("in while count=" + count);
if (count % 10 == 0) {
relesable = true;
}
}

System.out.println("out while count=" + count);
}
}

0 comments on commit 3173d39

Please sign in to comment.