前言
对于hql的执行,可以在集群中启动hive的命令行,也可以使用beeline或其他客户端用jdbc连接Hiveserver2发送sql,中间传输用的是thrift协议。
这边演示python的实现和java的实现
实现
python
python使用pyhive或者impyla库都可以。使用方式也都类似,这边以pyhive举例。
1 2 3 4 5 6 7 8 9 10 11 12 13
| from pyhive import hive
// 连接 conn = hive.connect(host=host, port=port, username=user, password=password, database='default')
// 获取游标和执行sql cursor = conn.cursor() sql = "show tables" cursor.execute(sql)
// 数据和表头 data = cursor.fetchall() columns = cursor.description
|
java
java的jdbc连接使用的是java.sql.*,
还需要加上外部依赖hive-jdbc和hadoop-commmon,
这边先定义结果数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class HiveSqlResultModel { List<JSONArray> meta;
List<JSONArray> data;
Integer count; }
public class HiveSqlResultColumnModel { String name;
String type;
Integer precision;
Integer scale; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| public class HiveClientUtil {
private final Logger logger = LoggerFactory.getLogger(HiveClientUtil.class);
private String DRIVERNAME = "org.apache.hive.jdbc.HiveDriver"; private String QUEUE = "default"; private String hiveurl; private String username; private String password; private List<String> defaultPreSql;
public HiveClientUtil(String host, String port, String username, String password, String database) { this.hiveurl = String.format("jdbc:hive2://%s:%s/%s", host, port, database); this.username = username; this.password = password; this.defaultPreSql = new ArrayList<>(); this.defaultPreSql.add(String.format("SET tez.queue.name=%s", QUEUE)); }
private void execPreSql(PreparedStatement preparedStatement, List<String> presqls) { presqls.forEach(presql -> { try { preparedStatement.execute(presql); } catch (SQLException e) { e.printStackTrace(); throw new RuntimeException("前置SQL执行失败"); } }); }
public HiveSqlResultModel execute(String sql) throws SQLException { return this.execute(sql, null); }
public HiveSqlResultModel execute(String sql, List<String> presqls) throws SQLException { try { Class.forName(DRIVERNAME); }catch (ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException("sql执行初始化失败"); } Connection conn = DriverManager.getConnection(hiveurl, username, password);
PreparedStatement preparedStatement = conn.prepareStatement(sql);
execPreSql(preparedStatement, defaultPreSql); if(presqls != null) { execPreSql(preparedStatement, presqls); }
ResultSet result = preparedStatement.executeQuery(); ResultSetMetaData metaData = result.getMetaData(); int columnCount = metaData.getColumnCount();
HiveSqlResultModel hiveSqlResultModel = new HiveSqlResultModel(); List<JSONArray> metas = new ArrayList<>(); for(int i = 1; i <= columnCount; i++) { HiveSqlResultColumnModel columnModel = new HiveSqlResultColumnModel(); columnModel.setName(metaData.getColumnName(i)); columnModel.setType(metaData.getColumnTypeName(i)); if("DECIMAL".equals(metaData.getColumnTypeName(i))){ columnModel.setPrecision(metaData.getPrecision(i)); columnModel.setScale(metaData.getScale(i)); } metas.add(columnModel.toJsonArray()); } hiveSqlResultModel.setMeta(metas);
List<JSONArray> data = new ArrayList<>(); int count = 0; while(result.next()) { JSONArray rowData = new JSONArray(); for(int i = 1; i <= columnCount; i++) { rowData.add(result.getString(i)); } data.add(rowData); count++; } hiveSqlResultModel.setCount(count); hiveSqlResultModel.setData(data); return hiveSqlResultModel; } }
|