개요
- impala-shell 소스 분석을 위한 포스트.
- 개인 연구/분석 목적으로 포스트를 작성하였으며, 참고용으로만 활용 할 것.
- Cloudera / Python 2.6.6 기반에서 설치된 impala-shell을 분석함.
- impala-shell 경로
- 메인소스 : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
- client/ data driven : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/
 
메인 Source(impala_shell.py)
Location : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
select 처리 메소드.
- impala_shell.py - do_select()
- impala에서 select 관련 문법이 나올경우 처리 로직.
- Query beeswax 후, _excute_stmt()로 쿼리 처리.
def do_select(self, args):
    """Executes a SELECT... query, fetching all rows"""
    query = self.imp_client.create_beeswax_query("select %s" % args, self.set_query_options)
    return self._execute_stmt(query)
모든 쿼리문을 실행시키는 로직 메소드.
- impala_shell.py - _execute_stmt(self, query, is_insert=False)
- 클라이언트가 쿼리를 실행하면 query_handle이 즉시 반환됩니다. 이 때, 쿼리 DML이 INSERT가 아닐경우 생성기를 사용하여 스트리밍 될 때 클라이언트에서 결과를 가져옵니다. 실행 시간이 출력되고 쿼리가 아직 종료되지 않은 경우 닫힙니다.
def _execute_stmt(self, query, is_insert=False):
...
    self.last_query_handle = self.imp_client.execute_query(query)
...
클라이언트 처리 Source(impala_client.py)
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/impala_client.py
execute 쿼리 핸들러
- impala_client.py - execute_query(self, query)
- 상위 메소드인 impala_shell._execute_stmt에 쿼리 핸들링정보와 쿼리 상태(Error or success . etc.)를 리턴.
- _do_rpc () : 일종의 소켓 체크/ 쿼리 검증 과정.
def execute_query(self, query):
    rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
    last_query_handle, status = rpc_result
    if status != RpcStatus.OK:
      raise RPCException("Error executing the query")
    return last_query_handle
fetch 메소드
- impala_client.py - fetch(self, query_handle)
- 쿼리 실행 후 row 결과를 출력/ 처리하는 메소드.
- 여기서 인코딩을 확인해보자 !
  def fetch(self, query_handle):
    """Fetch all the results.
    This function returns a generator to create an iterable of the result rows.
    """
    result_rows = []
    while True:
      rpc_result = self._do_rpc(
        lambda: self.imp_service.fetch(query_handle, False,
                                       self.fetch_batch_size))
      result, status = rpc_result
      if status != RpcStatus.OK:
        raise RPCException()
      result_rows.extend(result.data)
      if len(result_rows) >= self.fetch_batch_size or not result.has_more
        rows = [row.split('\t')  for row in result_rows]
        result_rows = []
        yield rows
        if not result.has_more:
          break
Impala Service 모듈
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/ImpalaService/ImpalaService.py
- Impala-shell에서 사용되는 인스턴스 클래스 정보를 모아놓은 모듈.
Client 클래스
class Client(beeswaxd.BeeswaxService.Client, Iface):
  def __init__(self, iprot, oprot=None):
    beeswaxd.BeeswaxService.Client.__init__(self, iprot, oprot)
Impala Beeswaxd 모듈
- Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/beeswaxd/BeeswaxService.py
- 이름(밀랍)처럼 Query를 이어 붙인 뒤, 쿼리를 실질적으로 핸들링 하는 모듈.
Client 클래스
class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iprot = self._oprot = iprot
    if oprot is not None:
      self._oprot = oprot
    self._seqid = 0
...
  def query(self, query):
    """
    Submit a query and return a handle (QueryHandle). The query runs asynchronously.
    Parameters:
     - query
    """
    self.send_query(query)
    return self.recv_query()
...
  def send_query(self, query):
    self._oprot.writeMessageBegin('query', TMessageType.CALL, self._seqid)
    args = query_args()
    args.query = query
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()
...
'Bigdata Engineering' 카테고리의 다른 글
| [CDH] 클라우데라 Hadoop - Open JDK 교체(oracle jdk -> open jdk) (0) | 2020.02.14 | 
|---|---|
| [CDH] Cloudera Manager 클라우데라 매니저 버전 업그레이드 (0) | 2020.02.10 | 
| [Apache Impala] c++ 을 이용하여 urldecoder UDF 작성 하기 (0) | 2020.02.07 | 
| [Apache Impala]임팔라에 Hive UDF를 (User Defined Function) 등록하기 (0) | 2020.02.06 | 
| [CDH] 클라우데라 로그 확인 - cloudera log check (0) | 2020.02.04 |