개요
- impala-shell 소스 분석을 위한 포스트.
- 개인 연구/분석 목적으로 포스트를 작성하였으며, 참고용으로만 활용 할 것.
- Cloudera / Python 2.6.6 기반에서 설치된 impala-shell을 분석함.
- impala-shell 경로
- 메인소스 : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
- client/ data driven : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/
메인 Source(impala_shell.py)
Location : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
select 처리 메소드.
- impala_shell.py - do_select()
- impala에서 select 관련 문법이 나올경우 처리 로직.
- Query beeswax 후, _excute_stmt()로 쿼리 처리.
def do_select(self, args):
"""Executes a SELECT... query, fetching all rows"""
query = self.imp_client.create_beeswax_query("select %s" % args, self.set_query_options)
return self._execute_stmt(query)
모든 쿼리문을 실행시키는 로직 메소드.
- impala_shell.py - _execute_stmt(self, query, is_insert=False)
- 클라이언트가 쿼리를 실행하면 query_handle이 즉시 반환됩니다. 이 때, 쿼리 DML이 INSERT가 아닐경우 생성기를 사용하여 스트리밍 될 때 클라이언트에서 결과를 가져옵니다. 실행 시간이 출력되고 쿼리가 아직 종료되지 않은 경우 닫힙니다.
def _execute_stmt(self, query, is_insert=False):
...
self.last_query_handle = self.imp_client.execute_query(query)
...
클라이언트 처리 Source(impala_client.py)
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/impala_client.py
execute 쿼리 핸들러
- impala_client.py - execute_query(self, query)
- 상위 메소드인 impala_shell._execute_stmt에 쿼리 핸들링정보와 쿼리 상태(Error or success . etc.)를 리턴.
- _do_rpc () : 일종의 소켓 체크/ 쿼리 검증 과정.
def execute_query(self, query):
rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
last_query_handle, status = rpc_result
if status != RpcStatus.OK:
raise RPCException("Error executing the query")
return last_query_handle
fetch 메소드
- impala_client.py - fetch(self, query_handle)
- 쿼리 실행 후 row 결과를 출력/ 처리하는 메소드.
- 여기서 인코딩을 확인해보자 !
def fetch(self, query_handle):
"""Fetch all the results.
This function returns a generator to create an iterable of the result rows.
"""
result_rows = []
while True:
rpc_result = self._do_rpc(
lambda: self.imp_service.fetch(query_handle, False,
self.fetch_batch_size))
result, status = rpc_result
if status != RpcStatus.OK:
raise RPCException()
result_rows.extend(result.data)
if len(result_rows) >= self.fetch_batch_size or not result.has_more
rows = [row.split('\t') for row in result_rows]
result_rows = []
yield rows
if not result.has_more:
break
Impala Service 모듈
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/ImpalaService/ImpalaService.py
- Impala-shell에서 사용되는 인스턴스 클래스 정보를 모아놓은 모듈.
Client 클래스
class Client(beeswaxd.BeeswaxService.Client, Iface):
def __init__(self, iprot, oprot=None):
beeswaxd.BeeswaxService.Client.__init__(self, iprot, oprot)
Impala Beeswaxd 모듈
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/beeswaxd/BeeswaxService.py
- 이름(밀랍)처럼 Query를 이어 붙인 뒤, 쿼리를 실질적으로 핸들링 하는 모듈.
Client 클래스
class Client(Iface):
def __init__(self, iprot, oprot=None):
self._iprot = self._oprot = iprot
if oprot is not None:
self._oprot = oprot
self._seqid = 0
...
def query(self, query):
"""
Submit a query and return a handle (QueryHandle). The query runs asynchronously.
Parameters:
- query
"""
self.send_query(query)
return self.recv_query()
...
def send_query(self, query):
self._oprot.writeMessageBegin('query', TMessageType.CALL, self._seqid)
args = query_args()
args.query = query
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()
...
'Bigdata Engineering' 카테고리의 다른 글
[CDH] 클라우데라 Hadoop - Open JDK 교체(oracle jdk -> open jdk) (0) | 2020.02.14 |
---|---|
[CDH] Cloudera Manager 클라우데라 매니저 버전 업그레이드 (0) | 2020.02.10 |
[Apache Impala] c++ 을 이용하여 urldecoder UDF 작성 하기 (0) | 2020.02.07 |
[Apache Impala]임팔라에 Hive UDF를 (User Defined Function) 등록하기 (0) | 2020.02.06 |
[CDH] 클라우데라 로그 확인 - cloudera log check (0) | 2020.02.04 |