騰訊阿里頭條翻牌子 | ClickHouse中SQL執(zhí)行過程
點擊上方藍色字體,選擇“設為星標”
回復”資源“獲取更多資源

用戶提交一條查詢SQL背后發(fā)生了什么?



接收客戶端請求
初始化上下文
初始化Zookeeper(ClickHouse的副本復制機制需要依賴ZooKeeper)
常規(guī)配置初始化
綁定服務端的端口,根據(jù)網(wǎng)絡協(xié)議初始化Handler,對客戶端提供服務
int Server::main()
{
// 初始化上下文
global_context = std::make_unique<Context>(Context::createGlobal());
global_context->setApplicationType(Context::ApplicationType::SERVER);
// zk初始化
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
//其他config的初始化
//...
//綁定端口,對外提供服務
auto address = make_socket_address(host, port);
socket.bind(address, /* reuseAddress = */ true);
//根據(jù)網(wǎng)絡協(xié)議建立不同的server類型
//現(xiàn)在支持的server類型有:HTTP,HTTPS,TCP,Interserver,mysql
//以TCP版本為例:
create_server("tcp_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
//啟動server
for (auto & server : servers)
server->start();
}
初始化輸入和輸出流的緩沖區(qū)
接受請求報文,拆包
執(zhí)行Query(包括整個詞法語法分析,Query重寫,物理計劃生成和生成結果)
把Query結果保存到輸出流,然后發(fā)送到Socket的緩沖區(qū),等待發(fā)送回客戶端
void TCPHandler::runImpl()
{
//實例化套接字對應的輸入和輸出流緩沖區(qū)
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
while (1){
// 接收請求報文
receivePacket();
// 執(zhí)行Query
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
//根據(jù)Query種類來處理不同的Query
//處理insert Query
processInsertQuery();
//并發(fā)處理普通Query
processOrdinaryQueryWithProcessors();
//單線程處理普通Query
processOrdinaryQuery();
}
}
static std::tuple<ASTPtr, BlockIO> executeQueryImpl()
{
//構造Parser
ParserQuery parser(end, settings.enable_debug_queries);
ASTPtr ast;
//把Query轉化為抽象語法樹
ast = parseQuery(parser, begin, end, "", max_query_size);
//生成interpreter實例
auto interpreter = InterpreterFactory::get(ast, context, stage);
// interpreter解析AST,結果是BlockIO
res = interpreter->execute();
//返回結果是抽象語法樹和解析后的結果組成的二元組
return std::make_tuple(ast, res);
}
構建Parser,把Query解析成AST(抽象語法樹)
InterpreterFactory根據(jù)AST生成對應的Interpreter實例
AST是由Interpreter來解析的,執(zhí)行結果是一個BlockIO,BlockIO是對 BlockInputStream 和 BlockOutputStream 的一個封裝。
服務端調用 executeQuery 來處理client發(fā)送的Query,執(zhí)行后的結果保存在state這個結構體的io成員中。
每一條Query都會對應一個state結構體,記錄了這條Query的id,處理狀態(tài),壓縮算法,Query的文本和Query所處理數(shù)據(jù)對應的IO流等元信息。
然后服務端調用 processOrdinaryQuery 等方法把輸出流結果封裝成異步的IO流,發(fā)送到回client。

解析請求(Parser)
詞法分析和語法分析的核心邏輯可以在parseQuery.cpp的 tryParseQuery 中一覽無余。
該函數(shù)利用lexer將掃描Query字符流,將其分割為一個個的Token, token_iterator 即一個Token流迭代器,然后parser再對Token流進行解析生成AST抽象語法樹。
ASTPtr tryParseQuery()
{
//Token為lexer詞法分析后的基本單位,詞法分析后生成的是Token流
Tokens tokens(pos, end, max_query_size);
IParser::Pos token_iterator(tokens);
ASTPtr res;
//Token流經(jīng)過語法分析生成AST抽象語法樹
bool parse_res = parser.parse(token_iterator, res, expected);
return res;
}
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p(enable_explain);
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserSystemQuery system_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected);
return res;
}
bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
//創(chuàng)建AST樹節(jié)點
auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;
//select語句中會出現(xiàn)的關鍵詞
ParserKeyword s_select("SELECT");
ParserKeyword s_distinct("DISTINCT");
ParserKeyword s_from("FROM");
ParserKeyword s_prewhere("PREWHERE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_with("WITH");
ParserKeyword s_totals("TOTALS");
ParserKeyword s_having("HAVING");
ParserKeyword s_order_by("ORDER BY");
ParserKeyword s_limit("LIMIT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_by("BY");
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_with_ties("WITH TIES");
ParserKeyword s_offset("OFFSET");
//...
//依次對Token流爬取上述關鍵字
ParserTablesInSelectQuery().parse(pos, tables, expected)
//根據(jù)語法分析結果設置AST的Expression屬性,可以理解為如果SQL存在該關鍵字,這個關鍵字都會轉化為AST上的一個節(jié)點
select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables));
select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression));
select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression));
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length));
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));
}

執(zhí)行請求(Interpreter)
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
//舉個例子,如果該AST是由select語句轉化過來,
if (query->as<ASTSelectQuery>())
{
/// This is internal part of ASTSelectWithUnionQuery.
/// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
}
}
InterpreterSelectQuery::InterpreterSelectQuery()
{
//獲取AST
auto & query = getSelectQuery();
//對AST做進一步語法分析,對語法樹做優(yōu)化重寫
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
//每一種Query都會對應一個特有的表達式分析器,用于爬取AST生成執(zhí)行計劃(操作鏈)
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze()
{
// 剔除冗余列
removeDuplicateColumns(result.source_columns);
// 根據(jù)settings中enable_optimize_predicate_expression配置判斷是否進行謂詞下移
replaceJoinedTable(node);
// 根據(jù)settings中distributed_product_mode配置重寫IN 與 JOIN 表達式
InJoinSubqueriesPreprocessor(context).visit(query);
// 優(yōu)化Query內(nèi)部的布爾表達式
LogicalExpressionsOptimizer().perform();
// 創(chuàng)建一個從別名到AST節(jié)點的映射字典
QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
// 公共子表達式的消除
QueryNormalizer(normalizer_data).visit(query);
// 消除select從句后的冗余列
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
// 執(zhí)行標量子查詢,并且用常量替代標量子查詢結果
executeScalarSubqueries(query, context, subquery_depth);
// 如果是select語句還會做下列優(yōu)化:
// 謂詞下移優(yōu)化
PredicateExpressionsOptimizer(select_query, settings, context).optimize();
/// GROUP BY 從句的優(yōu)化
optimizeGroupBy(select_query, source_columns_set, context);
/// ORDER BY 從句的冗余項剔除
optimizeOrderBy(select_query);
/// LIMIT BY 從句的冗余列剔除
optimizeLimitBy(select_query);
/// USING語句的冗余列剔除
optimizeUsing(select_query);
}
ExpressionActionsChain chain;
analyzer.appendWhere(chain);
chain.addStep();
analyzer.appendSelect(chain);
analyzer.appendOrderBy(chain);
chain.finalize();
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input)
{
// 對應Query的AST
auto & query = getSelectQuery();
AnalysisResult expressions;
// 物理計劃,判斷表達式是否有where,aggregate,having,order_by,litmit_by等字段
expressions = analyzeExpressions(
getSelectQuery(),
*query_analyzer,
QueryProcessingStage::FetchColumns,
options.to_stage,
context,
storage,
true,
filter_info);
// 從Storage讀取數(shù)據(jù)
executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
// eg:根據(jù)SQL的關鍵字在BlockStream流水線中執(zhí)行相應的操作, 如where,aggregate,distinct都分別由一個函數(shù)負責執(zhí)行
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
executeDistinct(pipeline, true, expressions.selected_columns);
}
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{
// 實例化Block Stream
auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams)
// 讀取列對應的Block,并且組織成Block Stream
streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions);
}
BlockIO InterpreterInsertQuery::execute()
{
// table為存儲引擎接口
StoragePtr table = getTable(query);
BlockOutputStreamPtr out;
// 從存儲引擎讀取Block Stream
auto query_sample_block = getSampleBlock(query, table);
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
//執(zhí)行結果封裝成BlockIO
BlockIO res;
res.out = std::move(out);
}
using StoragePtr = std::shared_ptr< IStorage>;

while(true){
Block block;
//從IO流讀取block數(shù)據(jù)
block = async_in.read();
//發(fā)送block數(shù)據(jù)
sendData(block);
}
void TCPHandler::sendData(const Block & block)
{
//初始化OutputStream的參數(shù)
initBlockOutput(block);
// 調用BlockOutputStream的write函數(shù),把Block寫到輸出流
state.block_out->write(block);
state.maybe_compressed_out->next();
out->next();
}
結語

