淺談Java SPI原理與其在JDBC/Flink中的應(yīng)用
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

API vs SPI
API(Application Programming Interface)的概念對我們來說已經(jīng)是見怪不怪了。在日常開發(fā)過程中,我們需要調(diào)用平臺/框架提供的API,而我們的下游應(yīng)用也需要調(diào)用上游提供的API。一句話:API站在應(yīng)用的角度定義了功能如何實(shí)現(xiàn)。
但是,如果我們作為服務(wù)提供方,想要豐富現(xiàn)有的系統(tǒng),加入一些原本不具備的相對完整的能力,若是直接hack代碼的話,不僅要新建或改動很多API,還需要重新構(gòu)建相關(guān)的模塊,并且可能無法很好地保證新模塊與舊有模塊的統(tǒng)一性。而Java 6引入了SPI(Service Provider Interface,服務(wù)提供者接口),可以非常方便地幫助我們實(shí)現(xiàn)插件化開發(fā)。顧名思義,SPI仍然遵循基于接口編程的思想,服務(wù)提供方通過實(shí)現(xiàn)SPI定義的接口來擴(kuò)展系統(tǒng),SPI機(jī)制后續(xù)完成發(fā)現(xiàn)與注入的職責(zé)。也就是說,SPI是系統(tǒng)為第三方專門開放的擴(kuò)展規(guī)范以及動態(tài)加載擴(kuò)展點(diǎn)的機(jī)制。
API和SPI之間的不同可以藉由下圖來說明。

SPI實(shí)現(xiàn)原理
當(dāng)我們作為服務(wù)提供方利用SPI機(jī)制時,需要遵循SPI的約定:
先編寫好服務(wù)接口的實(shí)現(xiàn)類,即服務(wù)提供類;
然后在classpath的META-INF/services目錄下創(chuàng)建一個以接口全限定名命名的UTF-8文本文件,并在該文件中寫入實(shí)現(xiàn)類的全限定名(如果有多個實(shí)現(xiàn)類,以換行符分隔);
最后調(diào)用JDK中的java.util.ServiceLoader組件中的load()方法,就會根據(jù)上述文件來發(fā)現(xiàn)并加載具體的服務(wù)實(shí)現(xiàn)。簡單看一下ServiceLoader的源碼。首先列舉幾個重要的屬性,注釋寫得很清楚,就不多廢話了。
private static final String PREFIX = "META-INF/services/";
// The class or interface representing the service being loaded
private final Class service;
// The class loader used to locate, load, and instantiate providers
private final ClassLoader loader;
// The access control context taken when the ServiceLoader is created
private final AccessControlContext acc;
// Cached providers, in instantiation order
private LinkedHashMap providers = new LinkedHashMap<>();
// The current lazy-lookup iterator
private LazyIterator lookupIterator; 從load()方法開始向下追溯:
public static ServiceLoader load(Class service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
public static ServiceLoader load(Class service,
ClassLoader loader)
{
return new ServiceLoader<>(service, loader);
}
private ServiceLoader(Class svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}LazyIterator是一個懶加載服務(wù)提供類的迭代器(ServiceLoader本身也是實(shí)現(xiàn)了Iterable接口的),維護(hù)在lookupIterator中。在實(shí)際應(yīng)用中,我們需要調(diào)用ServiceLoader#iterator()方法獲取加載到的服務(wù)提供類的結(jié)果,該方法的代碼如下。
public Iterator iterator() {
return new Iterator() {
Iterator> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
} 該方法返回一個標(biāo)準(zhǔn)的迭代器,先從緩存的providers容器中獲取,若獲取不到,再通過lookupIterator進(jìn)行懶加載。內(nèi)部類LazyIterator的部分相關(guān)代碼如下。
private class LazyIterator implements Iterator {
Class service;
ClassLoader loader;
Enumeration configs = null;
Iterator pending = null;
String nextName = null;
private LazyIterator(Class service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}
// Iterator.hasNext()方法直接調(diào)用了此方法
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}
// Iterator.next()方法直接調(diào)用了此方法
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}
// ......
} 注意觀察hasNextService()和nextService()兩個方法:前者在前文所述SPI定義文件中逐個尋找對應(yīng)的服務(wù)提供類并加載資源,后者則通過反射創(chuàng)建服務(wù)提供類的實(shí)例,并緩存下來,直到完成整個發(fā)現(xiàn)與注入的流程,所以是懶加載的。由此也可得知,SPI機(jī)制內(nèi)部一定會遍歷所有的擴(kuò)展點(diǎn)并將它們?nèi)考虞d(主要缺點(diǎn))。
下面以JDBC和Flink為例簡單說說SPI的實(shí)際應(yīng)用。
JDBC中的SPI
JDBC是為用戶通過Java訪問數(shù)據(jù)庫提供的統(tǒng)一接口,而數(shù)據(jù)庫千變?nèi)f化,因此借助SPI機(jī)制可以靈活地實(shí)現(xiàn)數(shù)據(jù)庫驅(qū)動的插件化。
在使用舊版JDBC時,我們必須首先調(diào)用類似Class.forName("com.mysql.jdbc.Driver")的方法,通過反射來手動加載數(shù)據(jù)庫驅(qū)動。但是在新版JDBC中已經(jīng)不用寫了,只需直接調(diào)用DriverManager.getConnection()方法即可獲得數(shù)據(jù)庫連接??匆幌耲ava.sql.DriverManager的靜態(tài)代碼塊中調(diào)用的loadInitialDrivers()方法的部分代碼:
private static void loadInitialDrivers() {
// ......
AccessController.doPrivileged(new PrivilegedAction() {
public Void run() {
ServiceLoader loadedDrivers = ServiceLoader.load(Driver.class);
Iterator driversIterator = loadedDrivers.iterator();
try{
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) { }
return null;
}
});
// ......
} 可見是利用SPI機(jī)制來獲取并加載驅(qū)動提供類(java.sql.Driver接口的實(shí)現(xiàn)類)。以MySQL JDBC驅(qū)動為例,在其META-INF/services目錄下找到名為java.sql.Driver的文件:

其內(nèi)容是:
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver驅(qū)動類都會調(diào)用DriverManager#registerDriver()方法注冊自身。如果加載了多個JDBC驅(qū)動類(比如MySQL、PostgreSQL等等),獲取數(shù)據(jù)庫連接時會遍歷所有已經(jīng)注冊的驅(qū)動實(shí)例,逐個調(diào)用其connect()方法嘗試是否能成功建立連接,并返回第一個成功的連接。具體可參看DriverManager#getConnection()方法。
Flink中的SPI
SPI機(jī)制在Flink的Table模塊中也有廣泛應(yīng)用——因?yàn)镕link Table的類型有很多種,同樣非常適合插件化。org.apache.flink.table.factories.TableFactory是Flink為我們提供的SPI工廠接口,在其注釋中也說明了這一點(diǎn)。
/**
* A factory to create different table-related instances from string-based properties. This
* factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
* called with a set of normalized properties that describe the desired configuration. The factory
* allows for matching to the given set of properties.
*
* Classes that implement this interface can be added to the
* "META_INF/services/org.apache.flink.table.factories.TableFactory" file of a JAR file in
* the current classpath to be found.
*
* @see TableFormatFactory
*/
@PublicEvolving
public interface TableFactory {
Map requiredContext();
List supportedProperties();
}
以Flink-Hive Connector為例:
該文件的內(nèi)容為:
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.module.hive.HiveModuleFactory那么Flink是如何保證正確的TableFactory實(shí)現(xiàn)類被加載的呢?一路追蹤方法調(diào)用鏈,來到TableFactoryService#findSingleInternal()方法。
private static T findSingleInternal(
Class factoryClass,
Map properties,
Optional classLoader) {
List tableFactories = discoverFactories(classLoader);
List filtered = filter(tableFactories, factoryClass, properties);
if (filtered.size() > 1) {
throw new AmbiguousTableFactoryException(
filtered,
factoryClass,
tableFactories,
properties);
} else {
return filtered.get(0);
}
} 其中,discoverFactories()方法用來發(fā)現(xiàn)并加載Table的服務(wù)提供類,filter()方法則用來過濾出滿足當(dāng)前應(yīng)用需要的服務(wù)提供類。前者最終調(diào)用了ServiceLoader的相關(guān)方法,如下:
private static List discoverFactories(Optional classLoader) {
try {
List result = new LinkedList<>();
ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
ServiceLoader
.load(TableFactory.class, cl)
.iterator()
.forEachRemaining(result::add);
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for table factories.", e);
throw new TableException("Could not load service provider for table factories.", e);
}
} 
版權(quán)聲明:
文章不錯?點(diǎn)個【在看】吧!??




