# 异步执行hql cursor.execute('''select count(1) from table''', async_=True)
# 获取执行日志 # 每次poll拿到状态,如果还在执行中就fetch_logs并打印 # poll比较慢,建议测试时可以选择多join几张表 status = cursor.poll().operationState application_id = None while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): logs = cursor.fetch_logs() for message in logs: print(message) match_res = re.findall(r'App id (.*?)\)', message) if len(match_res) > 0: application_id = match_res[0] break status = cursor.poll().operationState
cursor.fetchall() cursor.close() conn.close()
打印的日志大致如下,最后一行有applicationId,可以通过对每行进行正则匹配获取。
1 2 3 4 5 6
INFO : Tez session hasn't been created yet. Opening session DEBUG : Adding local resource: scheme: "hdfs" host: "NameNodeHACluster" port: -1 file: "/tmp/hive/hive/_tez_session_dir/a3b39087-cf39-4850-a8a0-2e31857a9a64/hive-contrib.jar" DEBUG : DagInfo: {"context":"Hive","description":"select count(1) from table"} DEBUG : Setting Tez DAG access for queryId=hive_20200409162528_98663859-dda3-4f6f-a56b-618dc92b5a0c with viewAclString=*, modifyStr=souche,hive INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Status: Running (Executing on YARN cluster with App id application_1584028893195_1234)
def poll(self, get_progress_update=True): """Poll for and return the raw status data provided by the Hive Thrift REST API. :returns: ``ttypes.TGetOperationStatusResp`` :raises: ``ProgrammingError`` when no query has been started .. note:: This is not a part of DB-API. """