개요

  • 임팔라는 hive에서 쓰던 java UDF 뿐만 아니라 C++로 UDF가 등록 가능합니다.
  • Hive에서 쓰던 UDF를 Impala에 사용하면 구글에 오픈소스가 많을 뿐더러, Java기반이기에 친숙하여 손쉽게 등록이 가능합니다. 단, 문제점이 하나 있는데, python(내부적으로 unicode 사용) 기반의 impala와 한글 호환이 정상적으로 이루어지지 않는것입니다.

1. Impala UDF 개발을 위한 환경 패키지 구축

  • Impala에 정상적으로 UDF 등록을 위해서는 선행작업 - 즉 개발을 위한 환경패키지를 구축해야합니다.
  • Version 정보를 정확하게 파악 후 진행하여줍니다.

http://archive.cloudera.com/cdh5 에서 버전에 맞는 impala-udf-devel 패키지를 다운받아 줍니다.

  • 제가 개발을 진행하는 DMP는 centos 6(Red Hat 계열 리눅스) / CDH 5.7.6 환경입니다.
#임팔라데몬이 설치된 데이터노드 접속
cd ~/stage
wget http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.7.6/RPMS/x86_64/impala-udf-devel-2.5.0+cdh5.7.6+0-1.cdh5.7.6.p0.7.el6.x86_64.rpm

다운받은 rpm 패키지 설치.

# 기 설치 여부 확인
sudo rpm -qa | grep impala
>> 결과 없음.

# rpm 설치
sudo rpm -ivh impala-udf-devel-2.5.0+cdh5.7.6+0-1.cdh5.7.6.p0.7.el6.x86_64.rpm

# 설치 확인
rpm -qa | grep impala

# 설치된 패키지 경로 확인
rpm -ql impala-udf-devel-2.5.0+cdh5.7.6+0-1.cdh5.7.6.p0.7.el6.x86_64

2. impala UDF 샘플 코드 환경 구축.

  • 1번 과정을 통해 impala udf 개발을 위한 환경을 구축하였습니다.
  • rpm으로 설치된 패키지중 /usr/include/impala_udf/udf.h 는 가장 중요한 역할을합니다.
    • 스칼라 UDF를 작성하는 데 필요한 기본 선언이 명시됨.
    • AddUdf () 를 사용하며, UDF 작성 후 컴파일에 꼭 필요한 메소드.
  • 기본적인 개발 환경 구축을 하였으니, 컴파일이 바로 가능한 샘플 코드를 다운로드 해줍니다.

CDH 공식 github에서 impala UDF 샘플 코드 다운.

github 주소 : https://github.com/cloudera/impala-udf-samples

# 깃헙 클론
git clone https://github.com/cloudera/impala-udf-samples.git

3. impala UDF C++ 소스 작성.

  • 샘플 소스를 다운받은 뒤, CMakeLists.txt 에서 빌드할 소스를 자유롭게 등록 할 수 있습니다.
  • 우선, 소스목록편집/빌드 전에 C++ 기반의 소스를 작성해야합니다

헤더소스 url-coding.h (url decode 소스)

cd /home1/username/stage/impala-udf-samples
vi url-coding.h
#ifndef URLTOOL_UDF_H
#define URLTOOL_UDF_H

#include <impala_udf/udf.h>
#include 

using namespace impala_udf;
using namespace std;


StringVal UrlDecoder(FunctionContext* context, const StringVal& arg1);

StringVal UrlEncoder(FunctionContext* context, const StringVal& arg1);

std::string UrlEncode(const std::string& str);

std::string UrlDecode(const std::string& str);

unsigned char FromHex(unsigned char x);

unsigned char ToHex(unsigned char x);

#endif

메인소스 udf-urltool.cc (url decode 소스)

vi udf-urltool.cc
#include "udf-urltool.h"

#include 
#include 
#include 
#include 
using namespace std;


StringVal UrlDecoder(FunctionContext* context, const StringVal& arg1){
    if (arg1.is_null) return StringVal::null();

    try {
        std::string original((const char *)arg1.ptr,arg1.len);
        std::string decoded = UrlDecode(original);

        StringVal result(context, decoded.size());
        memcpy(result.ptr, decoded.c_str(), decoded.size());

        return result;
    }catch(...){
        return StringVal::null();
    }

}

StringVal UrlEncoder(FunctionContext* context, const StringVal& arg1){
    if (arg1.is_null) return StringVal::null();

    try {
        std::string original((const char *)arg1.ptr,arg1.len);
        std::string encoded = UrlEncode(original);

        StringVal result(context, encoded.size());
        memcpy(result.ptr, encoded.c_str(), encoded.size());

        return result;
    }catch(...){
        return StringVal::null();
    }
}




inline unsigned char ToHex(unsigned char x) {
    return  x > 9 ? x + 55 : x + 48;
}

inline unsigned char FromHex(unsigned char x) {
    unsigned char y;
    if (x >= 'A' && x <= 'Z') y = x - 'A' + 10;
    else if (x >= 'a' && x <= 'z') y = x - 'a' + 10;
    else if (x >= '0' && x <= '9') y = x - '0';
    else y = ' ';
    return y;
}

inline std::string UrlEncode(const std::string& str) {
    std::string strTemp = "";
    size_t length = str.length();
    for (size_t i = 0; i < length; i++)
    {
        if (isalnum((unsigned char)str[i]) ||
            (str[i] == '-') ||
            (str[i] == '_') ||
            (str[i] == '.') ||
            (str[i] == '~'))
            strTemp += str[i];
        else if (str[i] == ' ')
            strTemp += "+";
        else
        {
            strTemp += '%';
            strTemp += ToHex((unsigned char)str[i] >> 4);
            strTemp += ToHex((unsigned char)str[i] % 16);
        }
    }
    return strTemp;
}

inline std::string UrlDecode(const std::string& str) {
    std::string strTemp = "";
    size_t length = str.length();
    for (size_t i = 0; i < length; i++)
    {
        if (str[i] == '+') strTemp += ' ';
        else if (str[i] == '%')
        {
            unsigned char high = FromHex((unsigned char)str[++i]);
            unsigned char low = FromHex((unsigned char)str[++i]);
            strTemp += high*16 + low;
        }
        else strTemp += str[i];
    }
    return strTemp;
}

CMakeLists.txt 수정

  • 위 에서 설명한 /usr/include/impala_udf/udf.h 헤더를 이용하여 컴파일을 할 목록을 추려내는 CMakeLists.txt를 수정하여줍니다.
vi CMakeLists.txt
# Copyright 2012 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cmake_minimum_required(VERSION 2.6)

# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "build")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "build")

find_program(CLANG_EXECUTABLE clang++)

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -ggdb")

# Function to generate rule to cross compile a source file to an IR module.
# This should be called with the .cc src file and it will generate a
# src-file-ir target that can be built.
# e.g. COMPILE_TO_IR(test.cc) generates the "test-ir" make target.
# Disable compiler optimizations because generated IR is optimized at runtime
set(IR_COMPILE_FLAGS "-emit-llvm" "-c")
function(COMPILE_TO_IR SRC_FILE)
  get_filename_component(BASE_NAME ${SRC_FILE} NAME_WE)
  set(OUTPUT_FILE "build/${BASE_NAME}.ll")
  add_custom_command(
    OUTPUT ${OUTPUT_FILE}
    COMMAND ${CLANG_EXECUTABLE} ${IR_COMPILE_FLAGS} ${SRC_FILE} -o ${OUTPUT_FILE}
    DEPENDS ${SRC_FILE})
  add_custom_target(${BASE_NAME}-ir ALL DEPENDS ${OUTPUT_FILE})
endfunction(COMPILE_TO_IR)

# Build the UDA/UDFs into a shared library.
add_library(udfurltool SHARED udf-urltool.cc)


# Custom targest to cross compile UDA/UDF to ir

if (CLANG_EXECUTABLE)
  COMPILE_TO_IR(udf-urltool.cc )
endif(CLANG_EXECUTABLE)


# This is an example of how to use the test harness to help develop UDF and UDAs.

target_link_libraries(udfurltool ImpalaUdf)

4. 컴파일 & HDFS 업로드

  • 위에서 url decode를 위한 헤더파일과 C++ 소스를 작성하였습니다.
  • CMakeLists.txt 에서 빌드할 소스목록도 편집하였습니다.
  • 소스 작성과 리스트를 편집하였으니, 빌드 후 HDFS에 업로드를 해야합니다.

build 파일 제작(컴파일)

cd /home1/username/stage/impala-udf-samples
cmake .
make

HDFS에 소켓 업로드

cd build
hadoop fs -put libudfurltool.so /tmp/

5. impala에 udf 등록

  • HDFS에 빌드한 소켓 업로드를 완료 하였으니, impala UDF를 등록 하여줍니다.
  • UDF 등록에 관한 자세한 내용은 [링크] 참조 부탁드립니다.

등록

impala-shell
...
[hdfs.datanode.host:21000] > 
create function urldecode_test (string) returns string location 'hdfs://hdfs.namenode.host:8020/tmp/libudfurltool.so' symbol='UrlDecoder';

테스트

select default.urldecode_test(encoded_text) as decoded_text from db.table;

>>>
+--------------+
| decoded_text |
+--------------+
| 테스트1   |
| 테스트2   |
| 테스트3   |
| 테스트4   |
| 테스트5   |
| 테스트6   |
| 테스트7   |
+----------+

:: reference

https://www.cloudera.com/documentation/enterprise/5-8-x/topics/impala\_udf.html
https://data-flair.training/blogs/impala-udf/

impala 에 hive udf를 등록하기 위한 가이드 포스트.

임팔라에 하이브-HIVE UDF 등록

 

1. java로 쓰여진 Hive UDF 파일을 준비.

  • Location : hdfs://my.namenode.com:8020/user/username/lib/my-udf.jar

 

2. impala-shell 실행

impala-shell

 

3. impala-shell에서 함수 등록

  • hive와 다르게 function parameter와 return 타입을 명시해줘야 합니다.
  • location 경로는 local 경로가 아닌 hdfs 경로로 지정해주셔야 합니다.

     

    create function my_urldecode(string, string) returns string location 'hdfs://my.namenode.com:8020/user/username/lib/my-udf.jar' symbol='com.my.udf.MyURLDecode'

     

impala shell 기타 명령어

  • drop function

    DROP FUNCTION my_urldecode(STRING, STRING);
  • show function

    show functions;

 

UDF 등록후 impala에서 데이터 조회시 정상적이지 않은 값 return 시

  • impala는 유니코드 기반의 python이고, hive udf는 java기반이기 때문에 한글에 있어서 인코딩 문제가 발생함.
  • 이 내용은 다음 포스트에서 다루도록 하겠습니다

Server log:

tail -f /var/log/cloudera-scm-server/cloudera-scm-server.log

Agent log:

tail -f /var/log/cloudera-scm-agent/cloudera-scm-agent.log

or

tail -f /var/log/messages
 

개요

  • impala-shell 소스 분석을 위한 포스트.
  • 개인 연구/분석 목적으로 포스트를 작성하였으며, 참고용으로만 활용 할 것.
  • Cloudera / Python 2.6.6 기반에서 설치된 impala-shell을 분석함.
  • impala-shell 경로
    • 메인소스 : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py
    • client/ data driven : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/

 

 

메인 Source(impala_shell.py)

Location : /opt/cloudera/parcels/CDH/lib/impala-shell/impala_shell.py

select 처리 메소드.

  • impala_shell.py - do_select()
  • impala에서 select 관련 문법이 나올경우 처리 로직.
  • Query beeswax 후, _excute_stmt()로 쿼리 처리.
def do_select(self, args):
    """Executes a SELECT... query, fetching all rows"""
    query = self.imp_client.create_beeswax_query("select %s" % args, self.set_query_options)
    return self._execute_stmt(query)

 

 

모든 쿼리문을 실행시키는 로직 메소드.

  • impala_shell.py - _execute_stmt(self, query, is_insert=False)
  • 클라이언트가 쿼리를 실행하면 query_handle이 즉시 반환됩니다. 이 때, 쿼리 DML이 INSERT가 아닐경우 생성기를 사용하여 스트리밍 될 때 클라이언트에서 결과를 가져옵니다. 실행 시간이 출력되고 쿼리가 아직 종료되지 않은 경우 닫힙니다.
def _execute_stmt(self, query, is_insert=False):
...
    self.last_query_handle = self.imp_client.execute_query(query)
...

 

 

클라이언트 처리 Source(impala_client.py)

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/lib/impala_client.py

 

execute 쿼리 핸들러

  • impala_client.py - execute_query(self, query)
  • 상위 메소드인 impala_shell._execute_stmt에 쿼리 핸들링정보와 쿼리 상태(Error or success . etc.)를 리턴.
  • _do_rpc () : 일종의 소켓 체크/ 쿼리 검증 과정.
def execute_query(self, query):
    rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
    last_query_handle, status = rpc_result
    if status != RpcStatus.OK:
      raise RPCException("Error executing the query")
    return last_query_handle

 

 

fetch 메소드

  • impala_client.py - fetch(self, query_handle)
  • 쿼리 실행 후 row 결과를 출력/ 처리하는 메소드.
  • 여기서 인코딩을 확인해보자 !
  def fetch(self, query_handle):
    """Fetch all the results.
    This function returns a generator to create an iterable of the result rows.
    """
    result_rows = []
    while True:
      rpc_result = self._do_rpc(
        lambda: self.imp_service.fetch(query_handle, False,
                                       self.fetch_batch_size))

      result, status = rpc_result

      if status != RpcStatus.OK:
        raise RPCException()

      result_rows.extend(result.data)

      if len(result_rows) >= self.fetch_batch_size or not result.has_more
        rows = [row.split('\t')  for row in result_rows]
        result_rows = []
        yield rows
        if not result.has_more:
          break

 

Impala Service 모듈

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/ImpalaService/ImpalaService.py
  • Impala-shell에서 사용되는 인스턴스 클래스 정보를 모아놓은 모듈.

 

Client 클래스

class Client(beeswaxd.BeeswaxService.Client, Iface):
  def __init__(self, iprot, oprot=None):
    beeswaxd.BeeswaxService.Client.__init__(self, iprot, oprot)

 

Impala Beeswaxd 모듈

  • Location : /opt/cloudera/parcels/CDH/lib/impala-shell/gen-py/beeswaxd/BeeswaxService.py
  • 이름(밀랍)처럼 Query를 이어 붙인 뒤, 쿼리를 실질적으로 핸들링 하는 모듈.

 

Client 클래스

class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iprot = self._oprot = iprot
    if oprot is not None:
      self._oprot = oprot
    self._seqid = 0
...

  def query(self, query):
    """
    Submit a query and return a handle (QueryHandle). The query runs asynchronously.

    Parameters:
     - query
    """
    self.send_query(query)
    return self.recv_query()


...

  def send_query(self, query):
    self._oprot.writeMessageBegin('query', TMessageType.CALL, self._seqid)
    args = query_args()
    args.query = query
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()

...

 

 

 

 

+ Recent posts