ThreadLocal 不香了?ScopedValue才是王道?
共 21569字,需瀏覽 44分鐘
·
2024-07-28 10:15
閱讀本文大概需要 15 分鐘。
來(lái)自:網(wǎng)絡(luò),侵刪
remove()會(huì)造成嚴(yán)重的內(nèi)存泄露問(wèn)題。在JDK 20 Early-Access Build 28版本中便針對(duì)ThreadLocal類重新設(shè)計(jì)了一個(gè)ScopedValue類。
ThreadLocal
基本概念
應(yīng)用案例
@Service
public class ShoppingCartService {
private ThreadLocal<ShoppingCart> cartHolder = new ThreadLocal<>();
public ShoppingCart getCurrentCart() {
ShoppingCart cart = cartHolder.get();
if (cart == null) {
cart = new ShoppingCart();
cartHolder.set(cart);
}
return cart;
}
public void checkout() {
// 獲取當(dāng)前購(gòu)物車
ShoppingCart cart = getCurrentCart();
// 執(zhí)行結(jié)賬操作
// 清除當(dāng)前線程中購(gòu)物車的信息,防止內(nèi)存泄露
cartHolder.remove();
}
}
// 購(gòu)物車類
class ShoppingCart {
private List<Product> products = new ArrayList<>();
public void addProduct(Product product) {
products.add(product);
}
public List<Product> getProducts() {
return products;
}
}
ShoppingCartService 是一個(gè) Spring Bean,用來(lái)管理購(gòu)物車信息。在這個(gè) Bean 里,使用了 ThreadLocal<ShoppingCart> 來(lái)保存每個(gè)線程的購(gòu)物車信息。getCurrentCart 方法首先會(huì)從 ThreadLocal 中獲取購(gòu)物車信息,如果當(dāng)前線程沒(méi)有對(duì)應(yīng)的購(gòu)物車信息,那么就創(chuàng)建一個(gè)新的購(gòu)物車,并保存到 ThreadLocal 中。checkout 方法用來(lái)執(zhí)行結(jié)賬操作,結(jié)賬完成后,需要通過(guò)cartHolder.remove();清除當(dāng)前線程中的購(gòu)物車信息,以防止內(nèi)存泄露。這樣,即使在多線程環(huán)境下,每個(gè)線程都有自己獨(dú)立的購(gòu)物車信息,互不影響。這就是 ThreadLocal 在解決 Spring Bean 線程安全問(wèn)題上的一個(gè)應(yīng)用場(chǎng)景。
@Aspect
@Component
public class UserConsistencyAspect {
// 每個(gè)UserVo啟用線程隔離,在進(jìn)入切面后開(kāi)始創(chuàng)建,在業(yè)務(wù)邏輯中用完就被GC回收
private static final ThreadLocal<UserVo> userHolder = new ThreadLocal<>();
@Pointcut("@annotation(org.nozomi.common.annotation.GetUser)")
public void userAuthPoint() {}
@Around("userAuthPoint()")
public Object injectUserFromRequest(ProceedingJoinPoint joinPoint) throws Throwable {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
UserVo operator = (UserVo) authentication.getPrincipal();
if (operator == null) {
return Response.fail("用戶不存在");
}
userHolder.set(operator);
return joinPoint.proceed();
}
/**
* 取出當(dāng)前線程中的UserVo對(duì)象,這些UserVo是跟隨http創(chuàng)建的線程而隔離的
*
* @return 當(dāng)前線程的UserVo
*/
public static UserVo getUser() {
return userHolder.get();
}
}
UserConsistencyAspect .getUser()方法就可以獲取到這個(gè)http session中的User對(duì)象了。
@Service
public class ProductService {
private final ThreadLocal<Session> sessionThreadLocal = new ThreadLocal<>();
public Product getProductById(String id) {
Session session = getSession();
return session.get(Product.class, id);
}
public void updateProduct(Product product) {
Session session = getSession();
session.update(product);
}
private Session getSession() {
Session session = sessionThreadLocal.get();
if (session == null) {
session = sessionFactory.openSession();
sessionThreadLocal.set(session);
}
return session;
}
public void closeSession() {
Session session = sessionThreadLocal.get();
if (session != null) {
session.close();
sessionThreadLocal.remove();
}
}
}
getSession()方法時(shí),都會(huì)從ThreadLocal中獲取到屬于自己的Session。但是事實(shí)上這些session的處理已經(jīng)在mybatis或hibernate中都已經(jīng)通過(guò)ThreadLocal處理好了不需要開(kāi)發(fā)者再在業(yè)務(wù)中對(duì)session進(jìn)行隔離。這里的例子主要是為了解釋 ThreadLocal 是如何工作的,并不是實(shí)際開(kāi)發(fā)中推薦的做法。
StructuredTaskScope
Callable的形式向它提交任務(wù),我們將得到一個(gè)future返回,并且這個(gè)callable將在由作用域Scope為我們創(chuàng)建的虛線程種執(zhí)行。這很像Executor。但二者之間也有很大的區(qū)別。
public static Weather readWeather() throws Exception {
// try-with-resource
try(var scope = new StructuredTaskScope<Weather>()) {
Future<Weather> future = scope.fork(Weather::readWeatherFrom);
scope.join();
return future.resultNow();
}
}
try-with-resource模式。通過(guò)fork()方法fork一個(gè)Callable類型的任務(wù),fork()方法返回一個(gè)Future對(duì)象,我們調(diào)用join()方法阻塞調(diào)用,它將阻塞當(dāng)前線程,直到所有提交(frok)給StructuredTaskScope的任務(wù)都完成。最后調(diào)用Future的resultNow()獲取結(jié)果并返回。resultNow()將拋出異常,如果我們?cè)贔uture完成前調(diào)用它,所以我們要在join()方法中調(diào)用并將其返回。
ScopedValue
基本概念
-
ThreadLocal變量是可變的,任何運(yùn)行在當(dāng)前線程中的代碼都可以修改該變量的值,很容易產(chǎn)生一些難以調(diào)試的bug。 -
ThreadLocal變量的生命周期會(huì)很長(zhǎng)。當(dāng)使用ThreadLocal變量的 set方法,為當(dāng)前線程設(shè)置了值之后,這個(gè)值在線程的整個(gè)生命周期中都會(huì)保留,直到調(diào)用remove方法來(lái)刪除。但是絕大部分開(kāi)發(fā)人員不會(huì)主動(dòng)調(diào)用remove來(lái)進(jìn)行刪除,這可能造成內(nèi)存泄漏。 -
ThreadLocal變量可以被繼承。如果一個(gè)子線程從父線程中繼承ThreadLocal變量,那么該子線程需要獨(dú)立存儲(chǔ)父線程中的全部ThreadLocal變量,這會(huì)產(chǎn)生比較大的內(nèi)存開(kāi)銷。
基本用法
jdk.incubator.concurrent包中的ScopedValue類來(lái)表示。使用ScopedValue的第一步是創(chuàng)建ScopedValue對(duì)象,通過(guò)靜態(tài)方法newInstance來(lái)完成,ScopedValue對(duì)象一般聲明為static final。由于ScopedValue是孵化功能,要想使用需要在項(xiàng)目的第一級(jí)包目錄的同級(jí)目錄中創(chuàng)建一個(gè)java類module-info.java來(lái)將其引入模塊中:
module dioxide.cn.module {
requires jdk.incubator.concurrent;
}
VM Option中啟用預(yù)覽功能--enable-preview。下一步是指定ScopedValue對(duì)象的值和作用域,通過(guò)靜態(tài)方法where來(lái)完成。where方法有 3 個(gè)參數(shù):
-
ScopedValue對(duì)象 -
ScopedValue對(duì)象所綁定的值 -
Runnable或Callable對(duì)象,表示ScopedValue對(duì)象的作用域
Runnable或Callable對(duì)象執(zhí)行過(guò)程中,其中的代碼可以用ScopedValue對(duì)象的get方法獲取到where方法調(diào)用時(shí)綁定的值。這個(gè)作用域是動(dòng)態(tài)的,取決于Runnable或Callable對(duì)象所調(diào)用的方法,以及這些方法所調(diào)用的其他方法。當(dāng)Runnable或Callable對(duì)象執(zhí)行完成之后,ScopedValue對(duì)象會(huì)失去綁定,不能再通過(guò)get方法獲取值。在當(dāng)前作用域中,ScopedValue對(duì)象的值是不可變的,除非再次調(diào)用where方法綁定新的值。這個(gè)時(shí)候會(huì)創(chuàng)建一個(gè)嵌套的作用域,新的值僅在嵌套的作用域中有效。使用作用域值有以下幾個(gè)優(yōu)勢(shì):
-
提高數(shù)據(jù)安全性:由于作用域值只能在當(dāng)前范圍內(nèi)訪問(wèn),因此可以避免數(shù)據(jù)泄露或被惡意修改。 -
提高數(shù)據(jù)效率:由于作用域值是不可變的,并且可以在線程之間共享,因此可以減少數(shù)據(jù)復(fù)制或同步的開(kāi)銷。 -
提高代碼清晰度:由于作用域值只能在當(dāng)前范圍內(nèi)訪問(wèn),因此可以減少參數(shù)傳遞或全局變量的使用。
public class Main {
// 聲明了一個(gè)靜態(tài)的、最終的 ScopedValue<String> 實(shí)例
// ScopedValue 是一個(gè)支持在特定范圍內(nèi)(如任務(wù)或線程)中傳遞值的類
// 它的使用類似于 ThreadLocal,但更適合于結(jié)構(gòu)化并發(fā)
private static final ScopedValue<String> VALUE = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(stringScope()));
}
public static Object[] stringScope() throws Exception {
return ScopedValue.where(VALUE, "value", () -> {
// 使用 try-with-resource 來(lái)綁定結(jié)構(gòu)化并發(fā)的作用域
// 用于自動(dòng)管理資源的生命周期,這是一個(gè)結(jié)構(gòu)化任務(wù)范圍
// 在這個(gè)范圍內(nèi)創(chuàng)建的所有子任務(wù)都將被視為范圍的一部分
// 如果范圍中的任何任務(wù)失敗,所有其他任務(wù)都將被取消
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 使用了 scope.fork 來(lái)創(chuàng)建兩個(gè)并行的任務(wù)
// 每個(gè)任務(wù)都在執(zhí)行上下文中獲取 VALUE 的值,并對(duì)其進(jìn)行操作
Future<String> user = scope.fork(VALUE::get);
Future<Integer> order = scope.fork(() -> VALUE.get().length());
// join() 方法等待所有范圍內(nèi)的任務(wù)完成
// throwIfFailed() 方法會(huì)檢查所有任務(wù)的結(jié)果,如果任何任務(wù)失敗,則會(huì)拋出異常
scope.join().throwIfFailed();
// 在所有任務(wù)完成后,使用 resultNow() 方法獲取每個(gè)任務(wù)的結(jié)果,并將結(jié)果放入一個(gè)對(duì)象數(shù)組中
return new Object[]{user.resultNow(), order.resultNow()};
}
});
}
}
ScopedValue和結(jié)構(gòu)化并發(fā)來(lái)創(chuàng)建并執(zhí)行多個(gè)并行任務(wù),并安全地傳遞和操作任務(wù)上下文中的值。
源碼分析
A value that is set once and is then available for reading for a bounded period of execution by a thread. A ScopedValue allows for safely and efficiently sharing data for a bounded period of execution without passing the data as method arguments. ScopedValue defines the where(ScopedValue, Object, Runnable) method to set the value of a ScopedValue for the bouned period of execution by a thread of the runnable's run method. The unfolding execution of the methods executed by run defines a dynamic scope. The scoped value is bound while executing in the dynamic scope, it reverts to being unbound when the run method completes (normally or with an exception). Code executing in the dynamic scope uses the ScopedValue get method to read its value. Like a thread-local variable, a scoped value has multiple incarnations, one per thread. The particular incarnation that is used depends on which thread calls its methods.
ScopedValue 是一個(gè)對(duì)象,它被設(shè)置一次后,在執(zhí)行期間由一個(gè)線程有限期地讀取。ScopedValue 允許在有限的執(zhí)行期間內(nèi)在不將數(shù)據(jù)作為方法參數(shù)傳遞的情況下安全、有效地共享數(shù)據(jù)。ScopedValue 定義了 where(ScopedValue, Object, Runnable) 方法,這個(gè)方法在一個(gè)線程執(zhí)行 runnable 的 run 方法的有限執(zhí)行期間內(nèi)設(shè)置 ScopedValue 的值。由 run 執(zhí)行的方法展開(kāi)執(zhí)行定義了一個(gè)動(dòng)態(tài)作用域。在動(dòng)態(tài)作用域中執(zhí)行時(shí),作用域值是綁定的,當(dāng) run 方法完成時(shí)(正常或異常),它恢復(fù)到未綁定狀態(tài)。在動(dòng)態(tài)作用域中執(zhí)行的代碼使用 ScopedValue 的 get 方法來(lái)讀取其值。與線程局部變量類似,作用域值有多個(gè)化身,每個(gè)線程一個(gè)。使用哪個(gè)化身取決于哪個(gè)線程調(diào)用其方法。ScopedValue 的一個(gè)典型用法是在 final 和 static 字段中聲明。字段的可訪問(wèn)性將決定哪些組件可以綁定或讀取其值。ScopedValue中有3個(gè)內(nèi)部類,分別是Snapshot、Carrier、Cache,他們?cè)赟copedValue中起著至關(guān)重要的角色。
Snapshot
An immutable map from ScopedValue to values. Unless otherwise specified, passing a null argument to a constructor or method in this class will cause a NullPointerExceptionto be thrown.
NullPointerException異常。這個(gè)類的主要用途是為ScopedValue實(shí)例創(chuàng)建一個(gè)不可變的映射,這樣在運(yùn)行時(shí),無(wú)論其它代碼如何修改原始的ScopedValue實(shí)例,Snapshot中的值都不會(huì)發(fā)生變化。它為了提供一個(gè)安全的方式來(lái)在多線程環(huán)境下共享值。
Carrier
A mapping of scoped values, as keys, to values. A Carrier is used to accumlate mappings so that an operation (a RunnableorCallable) can be executed with all scoped values in the mapping bound to values. The following example runs an operation with k1 bound (or rebound) to v1, and k2 bound (or rebound) to v2.ScopedValue.where(k1, v1).where(k2, v2).run(() -> ... );A Carrier is immutable and thread-safe. The where method returns a new Carrier object, it does not mutate an existing mapping. Unless otherwise specified, passing a null argument to a method in this class will cause aNullPointerExceptionto be thrown.
Runnable或Callable),在該操作中,映射中的所有scoped values都綁定到值。Carrier是不可變的,并且是線程安全的。where方法返回一個(gè)新的Carrier對(duì)象,不會(huì)改變現(xiàn)有的映射。這是用于在ScopedValue實(shí)例和對(duì)應(yīng)值之間創(chuàng)建和保持映射關(guān)系的工具,使得這些映射關(guān)系可以在執(zhí)行操作時(shí)被一并應(yīng)用。
Cache
A small fixed-size key-value cache. When a scoped value's get() method is invoked, we record the result of the lookup in this per-thread cache for fast access in future.
get()方法時(shí),我們?cè)谶@個(gè)每線程緩存中記錄查找的結(jié)果,以便在將來(lái)快速訪問(wèn)。這個(gè)類的主要作用是優(yōu)化性能。通過(guò)緩存get()方法的結(jié)果,可以避免在多次獲取同一個(gè)ScopedValue的值時(shí)進(jìn)行重復(fù)的查找操作。只有當(dāng)ScopedValue的值被更改時(shí),才需要更新緩存。
where()
where()方法是ScopedValue類的核心方法與入口,它接收三個(gè)參數(shù)。當(dāng)操作完成時(shí)(正常或出現(xiàn)異常),ScopedValue將在當(dāng)前線程中恢復(fù)為未綁定狀態(tài),或恢復(fù)為先前綁定時(shí)的先前值。
graph TB
A("ScopedValue.where(key, value, op)")
A --> B("ScopedValue.Carrier.of(key, value)")
B --> C("ScopedValue.Carrier.where(key, value, prev)")
C --> D("返回ScopedValue.Carrier對(duì)象")
op已經(jīng)創(chuàng)建了一個(gè)StructuredTaskScope但沒(méi)有關(guān)閉它,那么退出op會(huì)導(dǎo)致在動(dòng)態(tài)范圍內(nèi)創(chuàng)建的每個(gè)StructuredTaskScope被關(guān)閉。這可能需要阻塞,直到所有子線程都完成了它們的子任務(wù)。關(guān)閉是按照創(chuàng)建它們的相反順序完成的。
ScopedValue.where(key, value, op);等價(jià)于使用ScopedValue.where(key, value).call(op);
public static <T, R> R where(ScopedValue<T> key,
T value,
Callable<? extends R> op) throws Exception {
return where(key, value).call(op);
}
Carrier.of(key, value);方法
/*
* 返回由單個(gè)綁定組成的新集合
*/
static <T> Carrier of(ScopedValue<T> key, T value) {
return where(key, value, null);
}
/**
* 向該map添加綁定,返回一個(gè)新的 Carrier 實(shí)例
*/
private static final <T> Carrier where(ScopedValue<T> key, T value,
Carrier prev) {
return new Carrier(key, value, prev);
}
call()
graph TB
D("ScopedValue.Carrier")
D --> E("ScopedValue.Carrier.call(op)")
E -->|分支1| F("ScopedValue.Cache.invalidate()")
E -->|分支2| G("ScopedValue.Carrier.runWith(newSnapshot, op)")
G --> H("ScopedValueContainer.call(op)")
H --> I("ScopedValueContainer.callWithoutScope(op)")
I --> J("Callable.call()")
小結(jié)
推薦閱讀:
發(fā)現(xiàn)一款 JSON 可視化工具神器,驚艷了!
阿里巴巴MySQL規(guī)范,五千字版,這次全了(建議收藏)
程序員在線工具站:cxytools.com
推薦一個(gè)我自己寫的工具站:http://cxytools.com,專為程序員設(shè)計(jì),包括時(shí)間日期、JSON處理、SQL格式化、隨機(jī)字符串生成、UUID生成、隨機(jī)數(shù)生成、文本Hash...等功能,提升開(kāi)發(fā)效率。
?戳閱讀原文直達(dá)! 朕已閱 
