前言
关于 HiveSQL 血缘,一般表示的就是 hive 数据仓库中所有表和字段的来源流向关系。它的解析是十分必要的,一方面数仓建表的时候有时只会定义 SQL 任务不会特别关注到任务之间的关系,对于查看的数据也不容易追溯两层以上或以下的数据来源和去向。
有了血缘就可以对离线任务执行的先后关系作出一定规范,可以做数据来源链路的分析,数据的上卷下钻,数仓直接的数据建模等。
代码实现参考:https://github.com/webgjc/sql-parser/
实现思路
一般来说比较直接的实现方式是 hivehook 的 LineageLogger,但直接用也有比较明显麻烦的地方,一个是借用了 hive 自带的 antlr3 的 hql 解析,如果有部分语法不满足,去修改解析文件会造成不可控影响;另一个用 hivehook 实现后的迭代测试发布等都是一个比较麻烦的过程,出错了也很难定位问题所在。
这边就考虑用 antlr4 配合 hive 内部的 Hplsql.g4 直接实现一个血缘的解析。实现方式还是 visit 模式。
表血缘
首先表血缘是比较之间简单的,比如对于一个 insert 来说,目标表永远只有一个,来源表是 select 中所有 from 的真实表。
字段血缘
对于字段血缘实现会麻烦一点,因为要将每个结果字段的层层关系找到并最后对应上真实表的字段,可能中间还会有多个字段计算为一个字段,一个字段于下层多个字段有血缘,还会有表别名,字段别名的干扰。
这边最后的考虑是将每个 select 剥离出来存成一个 object,其中包括来源表(来源子 select 则为 null),select 字段,父 select 的 Index(第一层则为 null)。在解析完成后所有 select 的 object 存为一个数组,然后逐个对最外层的字段进行溯源找到真实的来源表。
SHOW CODE
表血缘
首先定义好结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class TableNameModel { private String dbName; private String tableName;
public static String dealNameMark(String name) { if(name.startsWith("`") && name.endsWith("`")) { return name.substring(1, name.length()-1); }else{ return name; } }
public static TableNameModel parseTableName(String tableName) { TableNameModel tableNameModel = new TableNameModel(); String[] splitTable = tableName.split("\\."); if(splitTable.length == 2) { tableNameModel.setDbName(splitTable[0]); tableNameModel.setTableName(splitTable[1]); }else if(splitTable.length == 1) { tableNameModel.setTableName(splitTable[0]); } return tableNameModel; } }
public class HiveTableLineageModel { private TableNameModel outputTable; private HashSet<TableNameModel> inputTables; }
|
表血缘主要过程,监听 insert 语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class HiveSQLTableLineage extends HplsqlBaseVisitor {
private TableNameModel outputTable; private HashSet<TableNameModel> inputTables = new HashSet<>();
@Override public Object visitInsert_stmt(HplsqlParser.Insert_stmtContext ctx) { outputTable = Optional.ofNullable(ctx) .map(HplsqlParser.Insert_stmtContext::table_name) .map(RuleContext::getText) .map(TableNameModel::parseTableName) .orElse(null); return super.visitInsert_stmt(ctx); }
@Override public Object visitFrom_table_clause(HplsqlParser.From_table_clauseContext ctx) { Optional.ofNullable(ctx) .map(HplsqlParser.From_table_clauseContext::from_table_name_clause) .map(RuleContext::getText) .map(TableNameModel::parseTableName) .map(inputTables::add); return super.visitFrom_table_clause(ctx); }
public HiveTableLineageModel getTableLineage() { HiveTableLineageModel hiveTableLineageModel = new HiveTableLineageModel(); hiveTableLineageModel.setOutputTable(outputTable); hiveTableLineageModel.setInputTables(inputTables); return hiveTableLineageModel; }
}
|
字段血缘
定义结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class FieldNameModel { private String dbName; private String tableName; private String fieldName; }
public class FieldNameWithProcessModel { private String dbName; private String tableName; private String fieldName; private String process; }
public class HiveFieldLineageSelectItemModel { private Set<String> fieldNames; private String alias; private String process; }
public class HiveFieldLineageSelectModel { Integer id; Integer parentId; TableNameModel fromTable; String tableAlias; List<HiveFieldLineageSelectItemModel> selectItems; }
public class HiveFieldLineageModel { private FieldNameModel targetField; private HashSet<FieldNameModel> sourceFields; }
|
字段血缘主要过程,主要针对的是 insert 语句,
但一般的 select 也是可以用的,因为是把最外层 select 的字段作为结果字段,
有一个限制是中间不能有 select * 这种操作,因为目前不连接元数据库,就无法获得*对应的字段。
中间也记录了字段流转的计算过程,理应是一个数组,取了最长一个,这边比较不稳定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
| public class HiveSQLFieldLineage extends HplsqlBaseVisitor {
private TableNameModel outputTable;
private HashMap<String, HiveFieldLineageSelectModel> hiveFieldSelects = new LinkedHashMap<>();
private Map<Integer, String> selectParentKeyMap = new HashMap<>();
private String thisSelectId;
private String sourceSQL;
private HiveFieldLineageSelectItemModel selectItemModel; private List<HiveFieldLineageSelectItemModel> selectFields = new ArrayList<>(); private Boolean startSelectItem = false;
public HiveSQLFieldLineage(String sql) { this.sourceSQL = sql; }
private String subSourceSql(ParserRuleContext parserRuleContext) { return sourceSQL.substring( parserRuleContext.getStart().getStartIndex(), parserRuleContext.getStop().getStopIndex() + 1); }
@Override public Object visitInsert_stmt(HplsqlParser.Insert_stmtContext ctx) { outputTable = Optional.ofNullable(ctx) .map(HplsqlParser.Insert_stmtContext::table_name) .map(RuleContext::getText) .map(TableNameModel::parseTableName) .orElse(null); return super.visitInsert_stmt(ctx); }
@Override public Object visitExpr(HplsqlParser.ExprContext ctx) { if(startSelectItem) { Optional.ofNullable(ctx) .map(HplsqlParser.ExprContext::expr_atom) .map(HplsqlParser.Expr_atomContext::ident) .map(ParseTree::getText) .ifPresent(s -> { if(!StringUtils.isNumeric(s)) { selectItemModel.getFieldNames().add(TableNameModel.dealNameMark(s)); } }); } return super.visitExpr(ctx); }
@Override public Object visitSelect_list_item(HplsqlParser.Select_list_itemContext ctx) { startSelectItem = true; selectItemModel = new HiveFieldLineageSelectItemModel(); selectItemModel.setFieldNames(new HashSet<>()); Optional.ofNullable(ctx) .map(HplsqlParser.Select_list_itemContext::expr) .map(this::subSourceSql) .ifPresent(selectItemModel::setProcess); Optional.ofNullable(ctx) .map(HplsqlParser.Select_list_itemContext::select_list_alias) .map(HplsqlParser.Select_list_aliasContext::ident) .map(RuleContext::getText) .ifPresent(selectItemModel::setAlias); Object visit = super.visitSelect_list_item(ctx); selectFields.add(selectItemModel); return visit; }
@Override public Object visitFrom_clause(HplsqlParser.From_clauseContext ctx) { startSelectItem = false; HashMap<String, List<HiveFieldLineageSelectItemModel>> fieldItems = new HashMap<>(); for(HiveFieldLineageSelectItemModel item: selectFields) { HashMap<String, HashSet<String>> aliasSet = new HashMap<>(); for(String field: item.getFieldNames()) { String[] sp = field.split("\\."); if(sp.length == 2){ String key = thisSelectId + "_" + sp[0]; aliasSet.computeIfAbsent(key, t -> new HashSet<>()); aliasSet.get(key).add(sp[1]); }else if(sp.length == 1){ boolean flat = true; for(String k: selectParentKeyMap.values()) { if(k.startsWith(thisSelectId + "_")) { aliasSet.computeIfAbsent(k, t -> new HashSet<>()); aliasSet.get(k).add(sp[0]); flat=false; } } if(flat) { String key = thisSelectId + "_"; aliasSet.computeIfAbsent(key, t -> new HashSet<>()); aliasSet.get(key).add(sp[0]); } } } for(String key: aliasSet.keySet()) { fieldItems.computeIfAbsent(key, k -> new ArrayList<>()); HiveFieldLineageSelectItemModel selectItemModel = new HiveFieldLineageSelectItemModel(); selectItemModel.setFieldNames(aliasSet.get(key)); selectItemModel.setAlias(item.getAlias()); selectItemModel.setProcess(item.getProcess()); if(selectItemModel.getFieldNames().size() == 1 && selectItemModel.getAlias() == null) { selectItemModel.setAlias(selectItemModel.getFieldNames().iterator().next()); } fieldItems.get(key).add(selectItemModel); } } for(String key: fieldItems.keySet()) { if(hiveFieldSelects.get(key) != null) { hiveFieldSelects.get(key).setSelectItems(fieldItems.get(key)); } } return super.visitFrom_clause(ctx); }
@Override public Object visitSelect_stmt(HplsqlParser.Select_stmtContext ctx) { List<HplsqlParser.Fullselect_stmt_itemContext> selectItems = ctx.fullselect_stmt().fullselect_stmt_item(); for(HplsqlParser.Fullselect_stmt_itemContext selectItem: selectItems) { HiveFieldLineageSelectModel hiveFieldLineageSelectModel = new HiveFieldLineageSelectModel(); Integer thisId = selectItem.getStart().getStartIndex(); HplsqlParser.Subselect_stmtContext subSelect = selectItem.subselect_stmt(); HplsqlParser.From_table_name_clauseContext fromTableNameClause = Optional.ofNullable(subSelect) .map(HplsqlParser.Subselect_stmtContext::from_clause) .map(HplsqlParser.From_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_table_name_clause) .orElse(null); Optional.ofNullable(fromTableNameClause) .map(HplsqlParser.From_table_name_clauseContext::table_name) .map(RuleContext::getText) .map(TableNameModel::parseTableName) .ifPresent(hiveFieldLineageSelectModel::setFromTable); Optional.ofNullable(fromTableNameClause) .map(HplsqlParser.From_table_name_clauseContext::from_alias_clause) .map(HplsqlParser.From_alias_clauseContext::ident) .map(RuleContext::getText) .ifPresent(hiveFieldLineageSelectModel::setTableAlias);
Optional.ofNullable(subSelect) .map(HplsqlParser.Subselect_stmtContext::from_clause) .map(HplsqlParser.From_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_subselect_clause) .map(HplsqlParser.From_subselect_clauseContext::from_alias_clause) .map(RuleContext::getText) .ifPresent(hiveFieldLineageSelectModel::setTableAlias);
String alias = hiveFieldLineageSelectModel.getTableAlias(); String thisKey = String.format("%s_%s", thisId, alias == null ? "": alias); hiveFieldLineageSelectModel.setId(thisKey + ""); hiveFieldLineageSelectModel.setParentId(selectParentKeyMap.get(thisId)); hiveFieldLineageSelectModel.setSelectItems(new ArrayList<>()); hiveFieldSelects.put(thisKey, hiveFieldLineageSelectModel);
Optional.ofNullable(subSelect) .map(HplsqlParser.Subselect_stmtContext::from_clause) .map(HplsqlParser.From_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_subselect_clause) .map(HplsqlParser.From_subselect_clauseContext::select_stmt) .map(HplsqlParser.Select_stmtContext::fullselect_stmt) .map(HplsqlParser.Fullselect_stmtContext::fullselect_stmt_item) .ifPresent(subSelects -> subSelects.forEach(item -> selectParentKeyMap.put(item.getStart().getStartIndex(), thisKey)));
List<HplsqlParser.From_join_clauseContext> fromJoinClauses = Optional.ofNullable(subSelect) .map(HplsqlParser.Subselect_stmtContext::from_clause) .map(HplsqlParser.From_clauseContext::from_join_clause) .orElse(new ArrayList<>()); for(HplsqlParser.From_join_clauseContext fromJoinClauseContext: fromJoinClauses) { HiveFieldLineageSelectModel joinSelect = new HiveFieldLineageSelectModel(); Optional.ofNullable(fromJoinClauseContext) .map(HplsqlParser.From_join_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_table_name_clause) .map(HplsqlParser.From_table_name_clauseContext::table_name) .map(RuleContext::getText) .map(TableNameModel::parseTableName) .ifPresent(joinSelect::setFromTable); Optional.ofNullable(fromJoinClauseContext) .map(HplsqlParser.From_join_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_table_name_clause) .map(HplsqlParser.From_table_name_clauseContext::from_alias_clause) .map(HplsqlParser.From_alias_clauseContext::ident) .map(RuleContext::getText) .ifPresent(joinSelect::setTableAlias);
Optional.ofNullable(fromJoinClauseContext) .map(HplsqlParser.From_join_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_subselect_clause) .map(HplsqlParser.From_subselect_clauseContext::from_alias_clause) .map(RuleContext::getText) .ifPresent(joinSelect::setTableAlias);
String jalias = joinSelect.getTableAlias(); String jkey = String.format("%s_%s", thisId, jalias == null ? "": jalias); joinSelect.setId(jkey); joinSelect.setParentId(selectParentKeyMap.get(thisId)); joinSelect.setSelectItems(new ArrayList<>()); hiveFieldSelects.put(jkey, joinSelect);
Optional.ofNullable(fromJoinClauseContext) .map(HplsqlParser.From_join_clauseContext::from_table_clause) .map(HplsqlParser.From_table_clauseContext::from_subselect_clause) .map(HplsqlParser.From_subselect_clauseContext::select_stmt) .map(HplsqlParser.Select_stmtContext::fullselect_stmt) .map(HplsqlParser.Fullselect_stmtContext::fullselect_stmt_item) .ifPresent(subSelects -> subSelects.forEach(item -> selectParentKeyMap.put(item.getStart().getStartIndex(), jkey))); } } return super.visitSelect_stmt(ctx); }
@Override public Object visitSubselect_stmt(HplsqlParser.Subselect_stmtContext ctx) { thisSelectId = ctx.getStart().getStartIndex() + ""; selectFields = new ArrayList<>(); return super.visitSubselect_stmt(ctx); }
private List<HiveFieldLineageSelectModel> hiveFieldSelectList = new ArrayList<>();
private void transSelectToList() { for(String key: hiveFieldSelects.keySet()) { hiveFieldSelectList.add(hiveFieldSelects.get(key)); } }
private List<FieldNameModel> getTargetFields() { List<List<String>> items = hiveFieldSelectList.stream() .filter(item -> item.getParentId() == null) .map(HiveFieldLineageSelectModel::getSelectItems) .map(fields -> fields.stream() .map(HiveFieldLineageSelectItemModel::getAlias) .collect(Collectors.toList())) .collect(Collectors.toList()); List<String> res = new ArrayList<>(); for(List<String> item: items) { res.addAll(item); } res = res.stream().distinct().collect(Collectors.toList()); List<FieldNameModel> fieldNameModels = new ArrayList<>(); for(String i: res) { FieldNameModel fieldNameModel = new FieldNameModel(); if(outputTable != null) { fieldNameModel.setDbName(outputTable.getDbName()); fieldNameModel.setTableName(outputTable.getTableName()); } fieldNameModel.setFieldName(i); fieldNameModels.add(fieldNameModel); } return fieldNameModels; }
private HashSet<FieldNameWithProcessModel> sourceFields; private String fieldProcess = "";
private void findFieldSource(String targetField, String parentId) { hiveFieldSelectList.forEach(select -> { if((parentId == null && select.getParentId() == null) || (select.getParentId() != null && select.getParentId().equals(parentId))) { if(select.getSelectItems() != null) { if(select.getFromTable() == null) { select.getSelectItems().forEach(selectItem -> { if(selectItem.getAlias().equals(targetField)) { if(selectItem.getProcess().length() > fieldProcess.length()) { fieldProcess = selectItem.getProcess(); } for(String field: selectItem.getFieldNames()){ findFieldSource(field, select.getId()); } } }); }else{ select.getSelectItems().forEach(selectItem -> { if(selectItem.getAlias().equals(targetField)) { if(selectItem.getProcess().length() > fieldProcess.length()) { fieldProcess = selectItem.getProcess(); } for(String field: selectItem.getFieldNames()){ FieldNameWithProcessModel fieldNameWithProcessModel = new FieldNameWithProcessModel(); fieldNameWithProcessModel.setDbName(select.getFromTable().getDbName()); fieldNameWithProcessModel.setTableName(select.getFromTable().getTableName()); fieldNameWithProcessModel.setFieldName(field); fieldNameWithProcessModel.setProcess(fieldProcess); sourceFields.add(fieldNameWithProcessModel); } } }); } } } }); }
public List<HiveFieldLineageModel> getHiveFieldLineage() { transSelectToList(); List<FieldNameModel> targetFields = getTargetFields(); List<HiveFieldLineageModel> hiveFieldLineageModelList = new ArrayList<>(); for(FieldNameModel targetField: targetFields) { HiveFieldLineageModel hiveFieldLineageModel = new HiveFieldLineageModel(); hiveFieldLineageModel.setTargetField(targetField); sourceFields = new HashSet<>(); fieldProcess = ""; findFieldSource(targetField.getFieldName(), null); hiveFieldLineageModel.setSourceFields(sourceFields); hiveFieldLineageModelList.add(hiveFieldLineageModel); } return hiveFieldLineageModelList; }
public HashMap<String, HiveFieldLineageSelectModel> getHiveFieldSelects() { return hiveFieldSelects; } }
|
效果展示
举一个简单的 sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| INSERT INTO TABLE db_test.table_result SELECT t1.id, t2.name FROM ( SELECT id1 + id2 AS id FROM db_test.table1 ) t1 LEFT JOIN ( SELECT id, name FROM ( SELECT id, sourcename AS name FROM db_test.table2 ) ) t2 ON t1.id=t2.id
|
解析后的表血缘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| { "inputTables": [ { "dbName": "db_test", "tableName": "table2" }, { "dbName": "db_test", "tableName": "table1" } ], "outputTable": { "dbName": "db_test", "tableName": "table_result" } }
|
解析后的字段血缘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| [ { "sourceFields": [ { "dbName": "db_test", "fieldName": "id1", "process": "id1 + id2", "tableName": "table1" }, { "dbName": "db_test", "fieldName": "id2", "process": "id1 + id2", "tableName": "table1" } ], "targetField": { "dbName": "db_test", "fieldName": "id", "tableName": "table_result" } }, { "sourceFields": [ { "dbName": "db_test", "fieldName": "sourcename", "process": "sourcename", "tableName": "table2" } ], "targetField": { "dbName": "db_test", "fieldName": "name", "tableName": "table_result" } } ]
|
引用说明
1 2
| <java.version>1.8</java.version> <antlr4.version>4.7.2</antlr4.version>
|
1 2 3 4 5 6 7 8
| import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.RuleContext; import org.antlr.v4.runtime.tree.ParseTree; import org.apache.commons.lang3.StringUtils;
import xxx.HplsqlBaseVisitor; import xxx.HplsqlParser;
|