ThreadPoolExecutor가 큐잉하기 전에 스레드를 최대값으로 늘리려면 어떻게 해야 합니까?
인 '어디서나'가요.ThreadPoolExecutor
은, 「」의 에 있습니다.ExecutorService
많은 분들이 쓰시는 실타래 같은 거.Javadocs:
corePoolSize보다 크고 실행 중인 maximumPoolSize 스레드 수가 적을 경우 큐가 꽉 찬 경우에만 새 스레드가 생성됩니다.
즉, 다음 코드를 사용하여 스레드 풀을 정의하면 두 번째 스레드는 시작되지 않습니다.LinkedBlockingQueue
계는없없 없없없다다
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
큐가 한정되어 있고 큐가 꽉 찬 경우에만 코어 번호를 초과하는 스레드가 시작됩니다.많은 주니어 Java 멀티스레드 프로그래머들이 이러한 동작에 대해 모르고 있는 것으로 추측됩니다.ThreadPoolExecutor
.
이것이 최적이 아닌 구체적인 사용 사례가 있습니다.저는 TPE 수업을 직접 작성하지 않고 그 문제를 해결할 방법을 찾고 있습니다.
신뢰할 수 없는 서드파티에 콜백하는 웹 서비스에 대한 요구 사항입니다.
- 콜백을 Web 요구와 동기화하지 않기 때문에 스레드 풀을 사용합니다.
- 에 몇 에, 「1」이라고 은 하고 싶지 .
newFixedThreadPool(...)
대부분 휴면 상태인 스레드 수가 많습니다. - 이 트래픽이 버스트하는 경우가 자주 있기 때문에 스레드 수를 최대값(50개 등)으로 스케일업하고 싶습니다.
- 모든 콜백을 최대한 시도해야 하기 때문에 50을 초과하는 콜백을 큐잉할 수 있습니다.다른 웹 서버를 너무 많이 사용하는 것은 원치 않습니다.
newCachedThreadPool()
.
이 할 수 요?ThreadPoolExecutor
더 많은 스레드를 시작하기 전에 큐를 바운딩하고 꽉 채워야 하는 경우작업을 큐잉하기 전에 더 많은 스레드를 시작하도록 하려면 어떻게 해야 합니까?
편집:
@@Flavio를 하는 것이 .ThreadPoolExecutor.allowCoreThreadTimeOut(true)
코어 스레드가 타임아웃되어 종료되도록 합니다.나는 그것을 고려했지만 여전히 코어 스레드 기능을 원했다.가능하면 풀의 스레드 수가 코어 크기 아래로 떨어지는 것을 원하지 않았습니다.
이 할 수 요?
ThreadPoolExecutor
더 많은 스레드를 시작하기 전에 큐를 바운딩하고 꽉 채워야 합니다.
저는 드디어 진부한) 생각합니다.ThreadPoolExecutor
'연장하다'를 붙입니다.LinkedBlockingQueue
false
★★★★★★에queue.offer(...)
미미 、 인인인인중경 경경경경 。현재 스레드가 큐잉된 작업을 따라가지 못할 경우 TPE에 의해 스레드가 추가됩니다. 이미 스레드 수에 , " " " " " " " "RejectedExecutionHandler
예요.put(...)
을 사용하다
보면 요.offer(...)
할 수 있다false
★★★★★★★★★★★★★★★★★」put()
절대 차단하지 않기 때문에 그게 해킹 부분입니다.그러나 이것은 TPE의 큐 사용법과는 잘 맞아떨어지기 때문에 문제없다고 생각합니다.
코드는 다음과 같습니다.
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
하면, 「」가 .ThreadPoolExecutor
용용 :
- 스레드 수를 처음에는 코어 크기(여기서 1)까지 확장합니다.
- 큐에 넣어주세요.큐가 비어 있는 경우 기존 스레드로 처리되도록 큐잉됩니다.
- , "1"은 " " " 입니다.
offer(...)
거짓으로 속이다 - false가 반환되면 풀의 스레드 수를 최대 수(여기서는 50)에 이를 때까지 확장합니다.
- '아까', '아까', '아까', '아까', '아까'라고 .
RejectedExecutionHandler
RejectedExecutionHandler
다음으로 사용 가능한 첫 번째 스레드에 의해 처리되는 작업을 FIFO 순서로 큐에 넣습니다.
위의 예제 코드에서는 큐가 무제한이지만, 이를 경계 큐로 정의할 수도 있습니다.를 들어, " "에하면 "1000"이 됩니다.LinkedBlockingQueue
그러면 다음과 같이 됩니다.
- 스레드를 최대까지 확대하다
- 1000개의 태스크가 가득 찰 때까지 큐잉합니다.
- 큐에 빈 공간이 생길 때까지 발신자를 차단합니다.
에 ,, 요, 요, 필을 사용해야 한다면offer(...)
RejectedExecutionHandler
ㅇㅇㅇㅇ를 할 수 .offer(E, long, TimeUnit)
" "를 사용합니다.Long.MAX_VALUE
타임아웃으로 합니다.
경고:
실행자가 종료된 후 태스크가 실행자에 추가될 것으로 예상하는 경우 보다 스마트하게 실행 프로그램을 실행할 수 있습니다.RejectedExecutionException
관습에 RejectedExecutionHandler
이치노@RaduToader @RaduToader입니다.
편집:
이 답변의 또 다른 수정사항은 유휴 스레드가 있는지 TPE에 문의하고 유휴 스레드가 있는 경우에만 항목을 큐잉하는 것입니다. 해서 진짜 에다가 더하면 될 것 요.ourQueue.setThreadPoolExecutor(tpe);
법이있있 있있있다다
당신의 ★★★★★★★★★★★★★★★★★.offer(...)
을 사용하다
- " " " " 가 있는지 합니다.
tpe.getPoolSize() == tpe.getMaximumPoolSize()
에는 그냥 하면 돼요.super.offer(...)
. - 않은 경우
tpe.getPoolSize() > tpe.getActiveCount()
후, 「」를 호출합니다.super.offer(...)
실타래가 없는 것 같아서요. - 않으면 " "를 반환한다.
false
하다
아마도 다음과 같습니다.
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
에 액세스하기 .volatile
또는 (「」의 )getActiveCount()
스레드목록을 TPE를 잠그고 스레드목록을 확인합니다.또한 여기에는 유휴 스레드가 있을 때 작업이 부적절하게 큐잉되거나 다른 스레드가 분기될 수 있는 레이스 조건이 있습니다.
나는 이미 이 문제에 대해 두 개의 다른 답을 얻었지만, 나는 이것이 최선이라고 생각한다.
이는 현재 받아들여지고 있는 답변의 기술, 즉 다음과 같은 것을 기반으로 합니다.
- 의 " " " 를 .
offer()
false, return to false(가끔) - 에, 「」가 됩니다.
ThreadPoolExecutor
하거나, 을 거부합니다. - 설정하다
RejectedExecutionHandler
작업을 거부하면 큐잉할 수 있습니다.
문제는 언제인가 하는 것이다.offer()
거짓으로 속이다큐에 태스크가 몇 개 있을 경우 현재 받아들여진 답변은 false를 반환하지만, 이미 코멘트에서도 지적했듯이 이는 바람직하지 않은 결과를 초래합니다.또는 항상 false를 반환할 경우 대기열에 스레드가 있더라도 새 스레드가 계속 생성됩니다.
해결책은 Java 7을 사용하여offer()
tryTransfer()
대기 중인 컨슈머 스레드가 있는 경우 해당 스레드로 태스크가 전달됩니다. 않으면, 「」가 됩니다.offer()
하고 false를 한다.ThreadPoolExecutor
이치노
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
를 같은할 수 .allowCoreThreadTimeOut(true)
.
여기 훨씬 더 알기 쉬운 버전이 있습니다.새 작업이 실행될 때마다 corePoolSize(최대 풀 크기 제한)를 늘린 다음 작업이 완료될 때마다 corePoolSize(사용자 지정 "코어 풀 크기" 제한)를 줄입니다.
즉, 실행 중인 작업 또는 대기 중인 작업 수를 추적하여 corePoolSize가 사용자가 지정한 "코어 풀 크기"와 최대 풀 크기 사이에 있는 한 작업 수와 동일한지 확인합니다.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
된 바와 같이 이 후 지정 변경을 하지 않습니다.하거나 corePoolSize를 통해 조작하는.remove()
★★★★★★★★★★★★★★★★★」purge()
.
of of of of of of of of of of of of of of of of의 가 있습니다.ThreadPoolExecutor
비용이 .creationThreshold
「」를 덮어씁니다.execute
.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
그것도 도움이 될 수 있지만, 물론 당신의 것이 더 예술적으로 보입니다.
권장되는 답변에서는 JDK 스레드 풀의 문제 중 다음 중 하나만 해결됩니다.
JDK 스레드 풀은 큐잉에 치우쳐 있습니다.따라서 새 스레드를 생성하는 대신 작업을 대기열에 넣습니다.큐가 제한에 도달한 경우에만 스레드 풀에서 새 스레드가 생성됩니다.
부하가 가벼워질 때 스레드 폐기는 발생하지 않습니다.예를 들어 풀에 도달하는 작업의 버스트가 발생하여 풀의 부하가 최대가 된 후 한 번에 최대 2개의 작업 부하가 가벼운 경우 풀은 모든 스레드를 사용하여 스레드 폐기를 방지합니다.(2개의 스레드만 있으면 됩니다.)
위의 동작에 만족하지 못하고 위의 결함을 극복하기 위해 풀을 구현했습니다.
2) Lifo 스케줄링을 사용하면 문제가 해결됩니다.이 아이디어는 Ben Maurer가 ACM 응용 프로그램 2015 컨퍼런스에서 발표한 것입니다.시스템 @ Facebook 스케일
그래서 새로운 구현이 탄생했습니다.
지금까지의 실장에서는, ZEL 의 비동기 실행 퍼포먼스가 향상되고 있습니다.
이 구현은 스핀 기능을 통해 컨텍스트 스위치 오버헤드를 줄여 특정 사용 사례에 대해 뛰어난 성능을 제공합니다.
도움이 됐으면 좋겠는데...
PS: JDK Fork Join Pool은 Executer Service를 구현하여 "일반" 스레드 풀로서 기능합니다.실장은 퍼포먼스적이며 LIFO 스레드 스케줄링을 사용합니다.단, 내부 큐 크기나 폐기 타임아웃은 제어할 수 없습니다.가장 중요한 것은 작업을 취소할 때 중단하지 않는 것입니다.
큐를 false로 되돌리겠다는 당초 생각에 따라 다른 제안이 있습니다. 작업이 수 에는 항상 큐에 넣을 수 .execute()
센티넬 노옵
이 있기 LinkedBlockingQueue
새 태스크의 경우 사용 가능한 스레드가 있는 경우에도 태스크가 큐잉될 수 있습니다.사용 가능한 스레드가 있는 경우에도 새 스레드가 생성되지 않도록 하려면 큐에서 새 태스크를 대기하고 있는 스레드의 수를 추적하고 대기 스레드보다 더 많은 태스크가 큐에 있을 때만 새 스레드를 생성해야 합니다.
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };
final AtomicInteger waitingThreads = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}
@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});
제가 생각할 수 있는 최선의 해결책은 연장하는 것입니다.
ThreadPoolExecutor
에는 몇 방식이 .「 」 、 「 」 、 。beforeExecute
★★★★★★★★★★★★★★★★★」afterExecute
. 오버플로를 두 큐를 할 수 있습니다 확장에서는 태스크 피드에는 경계 큐를 사용하고 오버플로를 처리하기 위해 두 번째 경계 큐를 사용할 수 있습니다.가 ""를 호출했을 때"submit
、 [ Queue]에 수 있습니다.예외가 발생하면 작업을 오버플로 큐에 넣습니다. '어,어,어,어,어,어,어,어,어,어,afterExecute
작업 완료 후 훅을 눌러 오버플로우 큐에 아무것도 없는지 확인합니다.이렇게 하면 실행자는 먼저 경계 큐의 내용을 처리하고 시간이 허락하면 자동으로 이 경계 없는 큐에서 꺼냅니다.
솔루션보다 작업이 많은 것 같지만 적어도 큐에 예기치 않은 동작을 주는 것은 아닙니다.또한 큐와 스레드의 상태를 체크할 수 있는 더 좋은 방법이 있다고 생각합니다.예외에 의존하지 않고 큐와 스레드의 상태를 체크할 수 있습니다.
주의: 경계 큐가 있는 경우 JDK ThreadPoolExecutor의 경우 오퍼가 false를 반환할 때만 새 스레드를 만듭니다.BackPressure 비트를 생성하여 발신자 스레드에서 직접 콜을 실행하는 CallerRunsPolicy를 사용하여 유용한 정보를 얻을 수 있습니다.
풀에서 생성된 스레드에서 작업을 실행해야 하며 스케줄링하기 위해 UBounded 큐가 있어야 합니다.한편 풀 내의 스레드 수는 corePoolSize와 maximumPoolSize 사이에서 증가하거나 축소될 수 있습니다.이렇게 하면...
유감스럽게도 확장(프라이빗 메서드라고 부릅니다)으로 할 수 없었기 때문에 ThreadPoolExecutor에서 풀 복사 붙여넣기를 하고 실행 메서드를 조금 변경했습니다.
새로운 요청이 와서 모든 스레드가 사용 중일 때 바로 새로운 스레드를 만들고 싶지는 않았습니다(일반적으로 수명이 짧은 태스크가 있기 때문에).임계값을 추가했지만 필요에 따라 변경할 수 있습니다(대부분 IO의 경우 이 임계값을 삭제하는 것이 좋습니다).
private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;
protected void beforeExecute(Thread t, Runnable r) {
activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && this.workQueue.offer(command)) {
int recheck = this.ctl.get();
if (!isRunning(recheck) && this.remove(command)) {
this.reject(command);
} else if (workerCountOf(recheck) == 0) {
this.addWorker((Runnable) null, false);
}
//>>change start
else if (workerCountOf(recheck) < maximumPoolSize //
&& (activeWorkers.get() > workerCountOf(recheck) * threshold
|| workQueue.size() > workerCountOf(recheck) * threshold)) {
this.addWorker((Runnable) null, false);
}
//<<change end
} else if (!this.addWorker(command, false)) {
this.reject(command);
}
}
다음은 코어 풀과 최대 풀 사이즈가 동일한 2개의 스레드 풀을 사용하는 솔루션입니다.두 번째 수영장은 첫 번째 수영장이 사용 중일 때 사용됩니다.
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyExecutor {
ThreadPoolExecutor tex1, tex2;
public MyExecutor() {
tex1 = new ThreadPoolExecutor(15, 15, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
tex1.allowCoreThreadTimeOut(true);
tex2 = new ThreadPoolExecutor(45, 45, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
tex2.allowCoreThreadTimeOut(true);
}
public Future<?> submit(Runnable task) {
ThreadPoolExecutor ex = tex1;
int excessTasks1 = tex1.getQueue().size() + tex1.getActiveCount() - tex1.getCorePoolSize();
if (excessTasks1 >= 0) {
int excessTasks2 = tex2.getQueue().size() + tex2.getActiveCount() - tex2.getCorePoolSize();;
if (excessTasks2 <= 0 || excessTasks2 / (double) tex2.getCorePoolSize() < excessTasks1 / (double) tex1.getCorePoolSize()) {
ex = tex2;
}
}
return ex.submit(task);
}
}
언급URL : https://stackoverflow.com/questions/19528304/how-to-get-the-threadpoolexecutor-to-increase-threads-to-max-before-queueing
'programing' 카테고리의 다른 글
장고 템플릿 변수 및 Javascript (0) | 2022.09.18 |
---|---|
Vue 구성 요소에서 서드파티 Javascript 라이브러리 사용 (0) | 2022.09.18 |
구문 강조 표시를 즉시 수행할 수 있는 텍스트 영역? (0) | 2022.09.18 |
오류: 테이블 xxx에 대한 테이블스페이스가 존재합니다.가져오기 전에 테이블스페이스를 폐기하십시오. (0) | 2022.09.18 |
MySQL 연결 연산자 (0) | 2022.09.18 |