1. 背景:风控跑批 CPU 飙到 90%+
在金融风控领域,特征计算跑批是一个典型的 CPU 密集型场景。我们的系统架构大致如下:
- 一个调度线程池负责拉取跑批任务
- 任务内部会调用一个共享的「计算线程池」执行特征计算(涉及大量数学运算、规则匹配等)
- 同时,系统还有 RPC 线程在处理实时请求
某天跑批期间,监控告警 CPU 使用率飙升到 90% 以上,实时 RPC 请求开始出现超时。初步分析后,我尝试了一个方案:
将调用计算线程池的那个调度线程池的优先级降到
Thread.MIN_PRIORITY(1),期望计算线程在 CPU 争抢时让步给 RPC 线程。
调整后,CPU 占用确实有所缓解,RPC 超时也减少了。但我对背后的原理并不完全理解——为什么降低调度线程池的优先级,会影响到计算线程池中线程的优先级?
于是我设计了一个实验来验证。
2. 实验:优先级继承的验证
2.1 测试代码
import java.util.concurrent.*;
public class ThreadPoolPriorityTest {
// 全局共享的"计算线程池"(模拟内部计算线程池) private static final ExecutorService poolA = Executors.newFixedThreadPool(2, r -> { Thread t = new Thread(r, "PoolA-Worker"); // 注意:这里我们 *不* 设置优先级,让它保持默认行为 System.out.println("[PoolA ThreadFactory] 创建线程: " + t.getName() + ", 初始优先级: " + t.getPriority() + ", 当前创建线程: " + Thread.currentThread().getName() + " (优先级=" + Thread.currentThread().getPriority() + ")"); return t; });
public static void main(String[] args) throws InterruptedException { System.out.println("=== 主线程启动 ==="); System.out.println("main 线程优先级: " + Thread.currentThread().getPriority());
// 创建第二个线程池:所有线程优先级设为最低 ExecutorService poolB = Executors.newFixedThreadPool(1, r -> { Thread t = new Thread(r, "PoolB-Worker"); t.setPriority(Thread.MIN_PRIORITY); // 显式设为 1 System.out.println("[PoolB ThreadFactory] 创建线程: " + t.getName() + ", 优先级设为: " + t.getPriority() + ", 创建者: " + Thread.currentThread().getName()); return t; });
System.out.println("\n>>> 提交任务到 PoolB(低优先级线程池)<<<"); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { Thread current = Thread.currentThread(); System.out.println("\n[PoolB 任务开始]"); System.out.println("当前线程: " + current.getName() + ", 优先级: " + current.getPriority());
System.out.println("→ 向 PoolA 提交 CPU 密集型计算任务...");
CountDownLatch latch = new CountDownLatch(1); poolA.submit(() -> { Thread calcThread = Thread.currentThread(); System.out.println("\n[PoolA 计算任务开始]"); System.out.println("计算线程: " + calcThread.getName() + ", 优先级: " + calcThread.getPriority());
long sum = 0; for (long i = 0; i < 100_000_000L; i++) { sum += i * i; } System.out.println("计算完成,结果摘要: " + (sum % 1000)); latch.countDown(); });
try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.println("[PoolB 任务结束]\n"); }, poolB);
future.join();
poolA.shutdown(); poolB.shutdown(); System.out.println("=== 程序结束 ==="); }}2.2 运行结果
=== 主线程启动 ===main 线程优先级: 5
>>> 提交任务到 PoolB(低优先级线程池)<<<[PoolB ThreadFactory] 创建线程: PoolB-Worker, 优先级设为: 1, 创建者: main
[PoolB 任务开始]当前线程: PoolB-Worker, 优先级: 1→ 向 PoolA 提交 CPU 密集型计算任务...[PoolA ThreadFactory] 创建线程: PoolA-Worker, 初始优先级: 1, 当前创建线程: PoolB-Worker (优先级=1)
[PoolA 计算任务开始]计算线程: PoolA-Worker, 优先级: 1计算完成,结果摘要: 880[PoolB 任务结束]
=== 程序结束 ===关键发现:PoolA 的线程工厂中我们没有显式设置优先级,但 PoolA-Worker 的优先级却是 1,而不是默认的 5。
3. 原理深挖:Java 线程优先级继承机制
3.1 Thread 构造函数源码
答案藏在 java.lang.Thread 的构造函数中:
// Thread.java (JDK 17)private Thread(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { // ... Thread parent = currentThread(); // 关键:获取当前线程作为"父线程" // ... this.priority = parent.getPriority(); // 继承父线程的优先级 // ...}当你 new Thread(r, "PoolA-Worker") 时,新线程的优先级会继承 Thread.currentThread() 的优先级。这里的 currentThread() 不是 main 线程,而是实际执行 submit() 的那个线程。
3.2 线程池的懒加载机制
ThreadPoolExecutor 的核心线程是懒创建的——只有当任务提交时,才会通过 ThreadFactory.newThread() 创建新线程。这意味着:
提交任务的线程(父线程) → ThreadFactory.newThread() → 新线程继承父线程优先级在我们的实验中,调用链是:
PoolB-Worker (优先级=1) └─ poolA.submit(task) └─ ThreadFactory.newThread() // 此时 currentThread() 是 PoolB-Worker └─ new Thread(r, "PoolA-Worker") // 继承优先级 1所以 PoolA 的线程虽然没有显式设置优先级,却因为是被优先级为 1 的线程触发创建的,继承了优先级 1。
3.3 ThreadGroup 的优先级上限
还有一个容易忽略的细节:ThreadGroup 也会限制线程优先级的上限。
if (priority > g.getMaxPriority()) { priority = g.getMaxPriority();}即使你 setPriority(10),如果所在 ThreadGroup 的 maxPriority 是 5,实际优先级也只会是 5。不过在大多数应用场景中,默认 ThreadGroup 的 maxPriority 是 10,所以这个限制通常不会触发。
4. 生产环境中的两种场景
回到我的风控跑批场景,实际上出现了两种不同的情况:
场景一:核心线程全部由低优先级线程创建(完全继承)
这就是实验中复现的情况。当计算线程池在跑批开始前没有被使用过,所有核心线程都是在跑批过程中由低优先级的调度线程触发创建的:
调度线程 (优先级=1) → submit → 计算线程池创建 Worker-1 (继承优先级=1)调度线程 (优先级=1) → submit → 计算线程池创建 Worker-2 (继承优先级=1)调度线程 (优先级=1) → submit → 计算线程池创建 Worker-3 (继承优先级=1)...结果:所有计算线程优先级都是 1,效果最理想。
场景二:核心线程已被其他线程创建(部分继承)
这是更常见也更隐蔽的情况。如果计算线程池在跑批之前已经被其他模块使用过(比如实时请求触发了一些计算),部分核心线程已经被 main 线程或 RPC 线程(优先级=5)创建好了:
系统启动阶段: RPC线程 (优先级=5) → submit → 计算线程池创建 Worker-1 (继承优先级=5) RPC线程 (优先级=5) → submit → 计算线程池创建 Worker-2 (继承优先级=5)
跑批阶段(任务量激增,超过核心线程数): 调度线程 (优先级=1) → submit → 计算线程池创建 Worker-3 (继承优先级=1) 调度线程 (优先级=1) → submit → 计算线程池创建 Worker-4 (继承优先级=1)结果:Worker-1、Worker-2 优先级为 5,Worker-3、Worker-4 优先级为 1。同一个线程池中出现了优先级不一致的线程,行为变得不可预测。
场景对比
| 维度 | 场景一(全部继承) | 场景二(部分继承) |
|---|---|---|
| 核心线程创建时机 | 跑批时首次创建 | 启动阶段已创建部分 |
| 线程优先级分布 | 全部为 1 | 混合(5 和 1) |
| CPU 让步效果 | 一致且可预期 | 不一致,部分线程仍抢占 CPU |
| 排查难度 | 低 | 高,表现不稳定 |
5. 为什么降低优先级能缓解 CPU 问题?
这里需要理解操作系统的线程调度机制:
- Java 线程优先级会映射到操作系统的原生线程优先级(Linux 上通过
nice值) - 当 CPU 资源紧张时,调度器会倾向于给高优先级线程分配更多时间片
- 降低计算线程的优先级后,RPC 线程(默认优先级 5)在 CPU 争抢时能获得更多时间片
但需要注意:
- 在 Linux 上,Java 线程优先级的实际效果取决于调度策略(CFS 调度器下效果有限)
- 优先级只是一个「提示」,不是硬性保证
- 在 CPU 不饱和时,优先级几乎没有影响
所以降低优先级只是「缓解」而非「根治」,真正的解决方案需要从架构层面入手。
6. 拓展:更可靠的解决方案
6.1 在 ThreadFactory 中显式设置优先级
最直接的方式——不要依赖继承,在 ThreadFactory 中显式指定:
ExecutorService computePool = Executors.newFixedThreadPool(4, r -> { Thread t = new Thread(r, "Compute-Worker"); t.setPriority(Thread.MIN_PRIORITY); // 显式设置,不依赖继承 return t;});这样无论是谁触发了线程创建,优先级都是确定的。
6.2 使用 prestartAllCoreThreads() 预热线程
如果你希望核心线程在特定时机、由特定线程创建,可以使用预热:
ThreadPoolExecutor pool = new ThreadPoolExecutor( 4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), r -> { Thread t = new Thread(r, "Compute-Worker"); // 不设置优先级,依赖创建者线程的优先级 return t; });
// 在 main 线程(优先级=5)中预热所有核心线程pool.prestartAllCoreThreads();但这种方式仍然无法控制超过核心线程数后新建线程的优先级,所以不如方案一可靠。
6.3 CPU 资源隔离(推荐)
在生产环境中,更推荐从资源层面隔离:
# 使用 cgroup 限制跑批进程的 CPU 使用率# 例如限制为 4 核(假设机器有 8 核)cgcreate -g cpu:/batch_jobsecho 400000 > /sys/fs/cgroup/cpu/batch_jobs/cpu.cfs_quota_usecho 100000 > /sys/fs/cgroup/cpu/batch_jobs/cpu.cfs_period_us或者在容器化环境中,为跑批任务分配独立的 Pod 并设置 CPU limits。
6.4 任务限流与错峰
从业务层面控制并发度:
// 使用 Semaphore 限制同时执行的计算任务数private final Semaphore computePermits = new Semaphore(4);
public void submitCompute(Runnable task) { computePool.submit(() -> { try { computePermits.acquire(); task.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { computePermits.release(); } });}6.5 分时调度
将跑批任务拆分为小批次,每批之间主动让出 CPU:
// 每处理 N 条数据后,主动 sleep 一小段时间for (int i = 0; i < dataList.size(); i++) { process(dataList.get(i)); if (i % 100 == 0) { Thread.sleep(10); // 给其他线程喘息的机会 }}7. 总结
| 要点 | 说明 |
|---|---|
| 线程优先级继承 | new Thread() 会继承 currentThread() 的优先级 |
| 线程池懒加载 | 核心线程在首次提交任务时才创建,创建者决定了优先级 |
| 隐蔽的不一致 | 同一线程池中的线程可能因创建时机不同而拥有不同优先级 |
| 优先级不是银弹 | Linux CFS 调度器下效果有限,只能缓解不能根治 |
| 最佳实践 | 永远在 ThreadFactory 中显式设置优先级,不要依赖继承 |
核心教训:永远不要假设线程池中线程的优先级是你期望的值。 如果你关心优先级,就在 ThreadFactory 中显式设置它。线程池的懒加载 + 优先级继承,是一个非常隐蔽的坑,尤其在多线程池嵌套调用的场景下。