前言 
hive在中间执行过程中留下不少钩子可以供开发者开发拓展功能,大致有如下几个
driver run的时候
 
执行计划semanticAnalyze前后
 
查询放入job之前
 
exec前后
 
执行失败时
 
 
下面引用一份完整的hive中hook的流程,包括相应的配置项。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Driver.run() => HiveDriverRunHook.preDriverRun()(hive.exec.driver.run.hooks) => Driver.compile() => HiveSemanticAnalyzerHook.preAnalyze()(hive.semantic.analyzer.hook) => SemanticAnalyze(QueryBlock, LogicalPlan, PhyPlan, TaskTree) => HiveSemanticAnalyzerHook.postAnalyze()(hive.semantic.analyzer.hook) => QueryString redactor (hive.exec.query.redactor.hooks)   => Authorization => Driver.execute() => ExecuteWithHookContext.run() || PreExecute.run() (hive.exec.pre.hooks) => TaskRunner => if  failed, ExecuteWithHookContext.run()(hive.exec.failure.hooks) => ExecuteWithHookContext.run() || PostExecute.run() (hive.exec.post.hooks) => HiveDriverRunHook.postDriverRun()(hive.exec.driver.run.hooks) 
血缘解析 
这边要举的具体例子为利用hive自带的血缘解析写一个hook。
首先确认hook主要要实现的是ExecuteWithHookContext的run方法,且它带一个参数HookContext,包含了几乎所有的信息
因为是要取的是血缘,得拿到的是正确执行的那部分,所以hook放在执行后的hive.exec.post.hooks,可以避免执行失败等问题。
新建一个maven项目,引用hive-exec,版本按照hive的来
1 2 3 4 5 <dependency>     <groupId>org.apache.hive</groupId>     <artifactId>hive-exec</artifactId>     <version>1.2.1</version> </dependency> 
直接上代码,表和字段级直接都放里面了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 package  cn.ganjiacheng;import  org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;import  org.apache.hadoop.hive.ql.hooks.HookContext;import  org.apache.hadoop.hive.ql.hooks.LineageInfo;import  org.apache.hadoop.hive.metastore.api.Table;import  org.slf4j.Logger;import  org.slf4j.LoggerFactory;import  java.util.*;public  class  MyLineagehook  implements  ExecuteWithHookContext      private  Logger logger = LoggerFactory.getLogger(MyLineagehook.class);          private  Set<String> inputTables;          private  Set<String> outputTables;               private  Map<String, ArrayList<String>> fieldLineage;     public  MyLineagehook ()           inputTables = new  HashSet<>();         outputTables = new  HashSet<>();         fieldLineage = new  HashMap<>();     }          private  String dealOutputTable (Table table)           String dbName = table.getDbName();         String tableName = table.getTableName();         return  dbName != null  ? String.format("%s.%s" , dbName, tableName) : tableName;     }          private  String dealDepOutputField (LineageInfo.DependencyKey dependencyKey)           try {             String tableName = dealOutputTable(dependencyKey.getDataContainer().getTable());             String field = dependencyKey.getFieldSchema().getName();             return  String.format("%s.%s" , tableName, field);         }catch  (Exception e) {             logger.error("deal dep output field error"  + e.getMessage());             return  null ;         }     }          private  String dealBaseOutputField (LineageInfo.BaseColumnInfo baseColumnInfo)           try {             String tableName = dealOutputTable(baseColumnInfo.getTabAlias().getTable());             String field = baseColumnInfo.getColumn().getName();             return  String.format("%s.%s" , tableName, field);         }catch  (Exception e) {             logger.error("deal base output field error"  + e.getMessage());             return  null ;         }     }                                   @Override      public  void  run (HookContext hookContext)           for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> dep: hookContext.getLinfo().entrySet()){                          Optional.ofNullable(dep.getKey())                     .map(LineageInfo.DependencyKey::getDataContainer)                     .map(LineageInfo.DataContainer::getTable)                     .map(this ::dealOutputTable)                     .ifPresent(outputTables::add);             Optional.ofNullable(dep.getValue())                     .map(LineageInfo.Dependency::getBaseCols)                     .ifPresent(items -> items.stream().map(LineageInfo.BaseColumnInfo::getTabAlias)                             .map(LineageInfo.TableAliasInfo::getTable)                             .map(this ::dealOutputTable)                             .forEach(inputTables::add));                          String column = Optional.ofNullable(dep.getKey())                     .map(this ::dealDepOutputField)                     .map(aimField -> {                         fieldLineage.put(aimField, new  ArrayList<>());                         return  aimField;                     }).orElse(null );             Optional.ofNullable(dep.getValue())                     .map(LineageInfo.Dependency::getBaseCols)                     .ifPresent(items -> items.stream()                             .map(this ::dealBaseOutputField)                             .forEach(item -> {                                 fieldLineage.get(column).add(item);                             }));         }         System.out.println("来源表:" );         System.out.println(inputTables);         System.out.println("输出表:" );         System.out.println(outputTables);         System.out.println("字段血缘:" );         System.out.println(fieldLineage.toString());     } } 
使用方法 
编译后生成jar文件
1 2 3 4 5 6 7 8 9 10 启动hive >hive 添加jar包 >add jar xxx.jar; 设置hook >set hive.exec.post.hooks=cn.ganjiacheng.MyLineagehook; 运行一个insert语句 
效果展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 INSERT  OVERWRITE TABLE  myuser_infoSELECT     m.id AS  id ,     m.name AS  name ,     mp.phone AS  phone,     me.email AS  email FROM     myuser m LEFT  JOIN (     SELECT          *     FROM          myuser_phone ) mp ON  m.id=mp.idLEFT  JOIN     myuser_email me ON  m.id=me.id
1 2 3 4 5 6 来源表: [default.myuser_phone, default.myuser_email, default.myuser] 输出表: [default.myuser_info] 字段血缘: {default.myuser_info.phone=[default.myuser_phone.phone], default.myuser_info.email=[default.myuser_email.email], default.myuser_info.id=[default.myuser.id], default.myuser_info.name=[default.myuser.name]} 
总结 
这边的实现比直接用antlr4解析方便很多,代码量也比较少。
这边比直接解析sql好的一点是之前use的库会自动帮你补全到字段血缘中,但直接解析sql就无法知道库。
还有这边也是直接支持 select * 这种表达式的元数据获取,但光解析sql就无法和元数据连接。
这边的hivehook解析完数据后,可以通过消息发送到MQ中,后续后端进行采集消费,这边不做拓展。