개요

  • impala-shell 소스 분석을 위한 포스트.
  • 개인 연구/분석 목적으로 포스트를 작성하였으며, 참고용으로만 활용 할 것.
  • Cloudera / Python 2.6.6 기반에서 설치된 impala-shell을 분석함.
  • impala-shell 경로
    • 메인소스 : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
    • client/ data driven : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/

 

 

메인 Source(impala_shell.py)

Location : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py

select 처리 메소드.

  • impala_shell.py - do_select()
  • impala에서 select 관련 문법이 나올경우 처리 로직.
  • Query beeswax 후, _excute_stmt()로 쿼리 처리.
def do_select(self, args):
    """Executes a SELECT... query, fetching all rows"""
    query = self.imp_client.create_beeswax_query("select %s" % args, self.set_query_options)
    return self._execute_stmt(query)

 

 

모든 쿼리문을 실행시키는 로직 메소드.

  • impala_shell.py - _execute_stmt(self, query, is_insert=False)
  • 클라이언트가 쿼리를 실행하면 query_handle이 즉시 반환됩니다. 이 때, 쿼리 DML이 INSERT가 아닐경우 생성기를 사용하여 스트리밍 될 때 클라이언트에서 결과를 가져옵니다. 실행 시간이 출력되고 쿼리가 아직 종료되지 않은 경우 닫힙니다.
def _execute_stmt(self, query, is_insert=False):
...
    self.last_query_handle = self.imp_client.execute_query(query)
...

 

 

클라이언트 처리 Source(impala_client.py)

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/impala_client.py

 

execute 쿼리 핸들러

  • impala_client.py - execute_query(self, query)
  • 상위 메소드인 impala_shell._execute_stmt에 쿼리 핸들링정보와 쿼리 상태(Error or success . etc.)를 리턴.
  • _do_rpc () : 일종의 소켓 체크/ 쿼리 검증 과정.
def execute_query(self, query):
    rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
    last_query_handle, status = rpc_result
    if status != RpcStatus.OK:
      raise RPCException("Error executing the query")
    return last_query_handle

 

 

fetch 메소드

  • impala_client.py - fetch(self, query_handle)
  • 쿼리 실행 후 row 결과를 출력/ 처리하는 메소드.
  • 여기서 인코딩을 확인해보자 !
  def fetch(self, query_handle):
    """Fetch all the results.
    This function returns a generator to create an iterable of the result rows.
    """
    result_rows = []
    while True:
      rpc_result = self._do_rpc(
        lambda: self.imp_service.fetch(query_handle, False,
                                       self.fetch_batch_size))

      result, status = rpc_result

      if status != RpcStatus.OK:
        raise RPCException()

      result_rows.extend(result.data)

      if len(result_rows) >= self.fetch_batch_size or not result.has_more
        rows = [row.split('\t')  for row in result_rows]
        result_rows = []
        yield rows
        if not result.has_more:
          break

 

Impala Service 모듈

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/ImpalaService/ImpalaService.py
  • Impala-shell에서 사용되는 인스턴스 클래스 정보를 모아놓은 모듈.

 

Client 클래스

class Client(beeswaxd.BeeswaxService.Client, Iface):
  def __init__(self, iprot, oprot=None):
    beeswaxd.BeeswaxService.Client.__init__(self, iprot, oprot)

 

Impala Beeswaxd 모듈

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/beeswaxd/BeeswaxService.py
  • 이름(밀랍)처럼 Query를 이어 붙인 뒤, 쿼리를 실질적으로 핸들링 하는 모듈.

 

Client 클래스

class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iprot = self._oprot = iprot
    if oprot is not None:
      self._oprot = oprot
    self._seqid = 0
...

  def query(self, query):
    """
    Submit a query and return a handle (QueryHandle). The query runs asynchronously.

    Parameters:
     - query
    """
    self.send_query(query)
    return self.recv_query()


...

  def send_query(self, query):
    self._oprot.writeMessageBegin('query', TMessageType.CALL, self._seqid)
    args = query_args()
    args.query = query
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()

...

 

 

 

 

+ Recent posts