Presto

 

*  Jvm.config

                     -Xmx448G

*  Config.properties

                     query.max-memory= 10752GB 

                     # value is set more than 42% of physical memory (448 * 0.42 = 188)

                     query.max-memory-per-node= 188GB

                     # value of this parameter should be greater than query.max-memory-per-node (448 * 0.50 = 224)

                     query.max-total-memory-per-node= 224GB

 

각 설정값 설명 :  

*  Jvm.config

-Xmx : 코디네이터/ 워커가 jvm에 올리는 최대 힙 메모리입니다. 아래설정값들은 Xmx 값을 기준으로 설정되어야합니다. 서버의 리소스가 많이부족하면 Xmx를 낮추고, 성능향상을 위해서는 Xmx값을 상승시켜 줍니다.

*  Config.properties

query.max-memory : 프레스토 상에서 구동되는 모든 쿼리의 메모리의 최대값입니다. 아래  query.max-total-memory-per-node 설정값 * 총 워커노드수로 계산하면 됩니다.

 query.max-memory-per-node :  워커노드에 단일 쿼리당 최대로 할당할 수 있는 메모리 값입니다. 아래 query.max-total-memory-per-node 의 42%로 지정하시면 됩니다.

query.max-total-memory-per-node= 프레스토 워커가 쿼리구동을 위해 최대로 가져갈 수 있는 메모리의 최대값입니다.Xmx의 50%수준이 적당합니다. 이 파라미터는   query.max-memory-per-node 값 보다 무조건 높아야합니다

2.2. kafka 모니터링

2.2.1. 토픽 스냅샷 활용

Kafka는 broker를 통해서 중재되는 pub-sub (발급-구독) 모델의 메세지 큐 입니다. 
발급자(producer)가 데이터를 보내줄때 남기는 로그를 관찰하거나, consumer를 통해 적재된 데이터를 확인하여 모니터링을 할 수 있습니다.
하지만 앞선 방법들로는 토픽의 실시간 흐름을 확인하기 어렵습니다. 
카프카내에서 흘러가는 데이터를 뜰채로 건져(스냅샷) 컬럼값을 파싱 한 후, 현재시간과 비교 한다면 아주 좋은 모니터링이 될 수 있습니다. 

 

[KAFKA TOPIC 리스트 확인 하는법]     

kafka의 모든 토픽 리스트들은 아래 명령어를 통해 확인 할 수 있습니다.

  • CLI )
    /usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181
  • 결과) 토픽 리스트 반환
    userTopic_1
    userTopic_2 

    ..
    userTopic_K

[KAFKA Console Consumer sh 을 활용하여 topic 정보 확인하는법]

kafka-console-consumer.sh은 kafka topic에 있는 데이터를 consume 가능한 쉘입니다. 
sh을 통해 커맨드를 날려도 걱정마세요! sh 이름이 consumer라고해서 실제로 소비되어 없어지는 것은 아닙니다 :P
kafka는 consumer group과 offset 을 통해 데이터가 전송되니깐요 :)
 
참고로, kafka에 저장된 데이터는 운영자가 설정한 기간만큼 저장되었다가 삭제됩니다.

아래명령어를 사용한다면 현재 카프카에 저장된 토픽 userTopic_K 정보를 확인 할 수 있습니다.
--max-messages 옵션을 사용하면 메세지 수를 조절할 수 있고,  --from-beginning 을 통해 시작 위치를 지정할 수 있습니다

  • CLI)
    /usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic userTopic_K --max-messages N (--from-beginning)



  • 결과) kafka를 통해 전송되는 data 출력
    row1, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row2, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row3, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row4, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row5, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row6, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    ..

    rowN, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM

    Processed a total of N messages
 
 
 

 

[실전 적용]

위에서 설명한 CLI sh파일을 통해 간단한 모니터링 쉘을 작성해 봅시다

vi kafka_topic_monit_test.sh

 


#topic 리스트를 배열로 받아오기
topiclist=`/usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181`
 
#consumer.sh를 사용하여 topic 데이터를 뜰채로 1개만 건지자
topic=`/usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic ${topiclist[2]} --max-messages 1 `
 
#topic 내 5번 6번 컬럼 변수저장
col5=`echo $topic | cut -d',' -f 5`
col6=`echo $topic | cut -d',' -f 6`
 
col5_date=`date -d"$col5" "+%Y-%m-%d %H:%M:%S"`
col6_send_date=`date -d"$col6" "+%Y-%m-%d %H:%M:%S"`
 
echo ${topiclist[2]} 토픽 데이터의 클러스터 적재 일자는 $col5_date 입니다
echo ${topiclist[2]} 토픽 데이터의 producer 송신 일자는 $col6_send_date 입니다
echo 현재시간은  `date  "+%Y-%m-%d %H:%M:%S"` 입니다

 

  • 실행 결과 
    아래 결과를 보면 클러스터 적재하자마자 바로 producer를 통해 kafka로 전송되었네요.
    producer 송신일자와 현재시간을 비교해보았을때, 약 카프카 내에서 약 5분의 딜레이가 나는것을 볼 수 있습니다.
$ ./kafka_topic_monit_test.sh

userTopic_K 토픽 데이터의 클러스터 적재 일자는 2021-12-14 03:04:53 입니다
userTopic_K 토픽 데이터의 producer 송신 일자는 2021-12-14 03:04:53 입니다
현재시간은 2021-12-14 03:09:13 입니다

 

2.2.2. 파티션의 오프셋 활용

kafka의 offset 정보를 활용하는것도 모니터링의 아주 좋은 예 입니다.


KT NexR의 대표적인 실시간 빅데이터 솔루션인 린스트림은 kafka 의 offset 정보를 zookeeper에 node id별로 저장하여 관리 하고있습니다.

zookeeper에는 해당 app node의 토픽 정보들이 담겨있습니다. 각 파티션별로 offset의 상태역시 확인이 가능합니다.
파티션별 토픽 offset 정보를 기록하고 일정주기로 추이를 관찰한다면, 시간별 처리량 역시 확인이 가능합니다.

아래에서 실질적 컨슈머인 spark의 node id ( app1-node-3)의 추이를 관찰해 봅시다.

 

[consumer의 offset을 확인하는 법]

  • CLI)
    /usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/app1-node-3


  • 결과) app1-node-3 의 offset 정보 출력


Connecting to localhost
WATCHER::WatchedEvent state:SyncConnected type:None path:null
5:16497745458,7:16497813566,1:16497415976,4:16497559539,8:16497739374,3:16497572711,2:16497153812,6:16497762636,0:16497555877,9:16497733103
cZxid = 0x1800e40cde
ctime = Wed Aug 18 17:48:32 KST 2021
mZxid = 0x1a000a559a
mtime = Tue Dec 14 03:34:07 KST 2021
pZxid = 0x1800e40cde
cversion = 0
dataVersion = 168546
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 139
numChildren = 0

[실전 적용]

위에서 설명한 CLI를 활용하여 일정주기 (1분단위)로 partition의 offset을 추적하는 sh을 작성하여 봅시다.

#!/bin/bash
 
function offsetCount (){
   ##  파티션:offset구조의 array를 매개변수로 넣음
   info=(${!1})
   waitInfo=(${!2})
   sum=0
 
   ## 각 파티션별 1분뒤 offset값의 차를 더해서 마지막 sum에 전달
   for (( i=0 ; i < ${#info[@]} ; i++));
   do
      val=0
      info_index=`expr index "${info[$i]}" ':'`
      waitinfo_index=`expr index "${waitInfo[$i]}" ':'`
 
      partition=${info[$i]:0:`expr $info_index - 1`}
      value=${info[$i]:$info_index}
      #echo "partition:$partition"
      #echo "value:$value"
      waitPartition=${waitInfo[$i]:0:`expr $waitinfo_index - 1`}
      waitValue=${waitInfo[$i]:$waitinfo_index}
      #echo "waitpartition:$waitPartition"
      #echo "value:$waitValue"
      if [ $partition == $waitPartition ]
      then
        val=`expr $waitValue - $value`
        sum=`expr $sum + $val`
      fi
   done
   echo $sum
 
}
 
declare -A var       ## -A 옵션은 string 타입의 인덱스 사용할수 있다.
declare -A afterVar
declare -a count     ## -a 옵션은 int 타입의 인덱스 사용할수 있다.
 
#구분자를 ,로 변경 (offset변수의 결과값에서 ,가 띄어쓰기로 변경된다.)
OLD_IFS=$IFS
IFS=,
 
appid="app1-node-3"
 
#오프셋 정보를담은 텍스트를 offset 변수에 저장
offset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
var+=( ["${appid}"]="${offset}" )
 
#다음 offset 체크를위해 60초 대기
sleep 60
 
#60초 후의 offset 정보를 변수에 저장
afterOffset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
afterVar+=( ["${appid}"]="${afterOffset}" )
 
#offset과 afterOffset의 변화량을 모두합산하여준다.
result=$(offsetCount var[${appid}][@] afterVar[${appid}][@])
count+=( "$result" )
 
echo $appid 의 1분동안 처리량 : $count
 
exit 0



출력 결과  ) 

$ ./checkOffset_test.sh
app1-node-3 의 1분동안 처리량 : 20021

20021개로 출력되는것을 볼 수 있다

 

2.2.3. Producer Agent의 Hang 체크

 

[Producer Agent는 왜 행이걸릴까?]

보통 데이터 적재가 안되면 "consumer group에서 행이걸린거겠지..?" 생각하고 consumer app과 KAFKA데몬만 재기동하는 경우가 많습니다. (사실 이 방법이 가장 직설적이며 간단한 방법이지요.)

클러스터는 상황에따라 데이터량이 갑작스럽게 증가 하는 경우가 많습니다.
예측하지 못한 상황에 의해 producer가 갑자기 많은양의 데이터를 감당하지 못하고 Hang이 걸리는 경우가 발생 할 수 있습니다. 

그렇기 때문에 kafka에 데이터를 보내 주는 producer 역시도 모니터링이 필요합니다.
데이터 전송상태를 출력하는 클래스를 Producer Agent에 구현 하는 것 도 하나의 모니터링 방법이 될 수 있습니다.

아래에서는 조금 색다른 방법의 모니터링을 보여드리겠습니다.

 

[실전 적용]

자, 여기 gc.log를 남기는 producer역할의 Agent 가 있습니다.

이 Agent는 Java기반의 프로그램이며, GC또한 존재합니다.

 

jvm 설정을 통해 출력하는 gc.log의 형태는 아래와 같습니다.

gc.log file 명 : gc.agent.log

2021-12-15T04:57:28.602+0900: 307125.450: [GC [PSYoungGen: 292374K->18933K(316928K)] 9972894K->9702027K(10058240K), 0.0203490 secs] [Times: user=0.29 sys=0.00, real=0.02 secs]
2021-12-15T04:57:34.741+0900: 307131.590: [GC [PSYoungGen: 293877K->23686K(316416K)] 9976971K->9718670K(10057728K), 0.0261380 secs] [Times: user=0.29 sys=0.02, real=0.02 secs]
2021-12-15T04:57:41.501+0900: 307138.350: [GC [PSYoungGen: 298630K->27418K(298496K)] 9993614K->9725402K(10039808K), 0.0345700 secs] [Times: user=0.27 sys=0.01, real=0.03 secs]
2021-12-15T04:57:47.930+0900: 307144.779: [GC [PSYoungGen: 298266K->27540K(294400K)] 9996250K->9728638K(10035712K), 0.0259060 secs] [Times: user=0.27 sys=0.00, real=0.03 secs]
2021-12-15T04:57:52.472+0900: 307149.320: [GC [PSYoungGen: 294292K->24043K(286720K)] 9995390K->9729680K(10028032K), 0.0279990 secs] [Times: user=0.36 sys=0.01, real=0.03 secs]
2021-12-15T04:57:58.552+0900: 307155.401: [GC [PSYoungGen: 286694K->25312K(284160K)] 9992330K->9739092K(10025472K), 0.0286700 secs] [Times: user=0.33 sys=0.01, real=0.03 secs]
2021-12-15T04:58:04.579+0900: 307161.428: [GC [PSYoungGen: 283861K->24093K(287232K)] 9997640K->9741483K(10028544K), 0.0290270 secs] [Times: user=0.34 sys=0.00, real=0.03 secs]
2021-12-15T04:58:10.363+0900: 307167.212: [GC [PSYoungGen: 279069K->23939K(275456K)] 9996459K->9744718K(10016768K), 0.0356700 secs] [Times: user=0.40 sys=0.00, real=0.04 secs]
2021-12-15T04:58:15.110+0900: 307171.958: [GC [PSYoungGen: 275331K->20312K(280064K)] 9996110K->9744154K(10021376K), 0.0312460 secs] [Times: user=0.37 sys=0.00, real=0.03 secs]
2021-12-15T04:58:19.922+0900: 307176.770: [GC [PSYoungGen: 268120K->22876K(267264K)] 9991962K->9750300K(10008576K), 0.0282790 secs] [Times: user=0.33 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.108+0900: 307181.956: [GC [PSYoungGen: 267100K->22232K(270848K)] 9994524K->9754632K(10012160K), 0.0293170 secs] [Times: user=0.36 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.137+0900: 307181.986: [Full GC [PSYoungGen: 22232K->0K(270848K)] [ParOldGen: 9732399K->9247170K(9814016K)] 9754632K->9247170K(10084864K) [PSPermGen: 49072K->49072K(262144K)], 3.3763050 secs] [Times: user=58.53 sys=0.00, real=3.37 secs]

 

이 Agent를 통해 발생하는 gc.log 활용 하여  java old영역의 점유율을 출력하는 sh을 작성하도록 하겠습니다.

vi gc_check.sh
#!/bin/bash
GC_LOG_PATH=/home/user/app/producer-agent/log
 
#GC 파일들을 읽어온다
GC_FILE=`ls -alrt $GC_LOG_PATH | grep gc.agent.log |  awk '{print $8}'`
 
#GC파일의 line을 변수에 저장. for 문으로 활용하시면 좋습니다
line=`tail $GC_LOG_PATH/${GC_FILE[0]} -n1`
 
#NUM1과 NUM2를 정규식으로 파싱하여 old영역 계산
NUM1=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $1}')
NUM2=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $2}')
((NUM2=${NUM2}*100))
((NUM1=${NUM2}/${NUM1}))
 
echo gc log의 old 영역은 $NUM1% 입니다.


출력 결과)

$ ./gc_check.sh
gc log의 old 영역은 95% 입니다.

이렇게 gc.log에서 old영역을 추출함으로서 Producer Agent의  Hang을 체크해 볼 수 있습니다.

GC가 해소되지 않는다는것은 Agent에 악영향을 끼치고 있다는 뜻입니다.

위 sh파일을 활용하여 추이를 관찰하고, 일정주기별로 재기동 하여준다면 아주 좋은 모니터링의 방법이 될 수 있습니다.


 

3. 결론 

최적의 실시간 데이터 모니터링 기법을 적용하여 장애나 데이터 누락을 최소화 하는 것은 Data Engineer의 필수요건입니다.

 

데이터 모니터링에는 정답이 없습니다.

위에서 작성했던 내용처럼 ActiveMQ의 웹콘솔과 CLI를 활용하여도 되고, GC log를 캐치하여 agent 행을 체크해도 됩니다.

만약 위의 방법이 어렵다면 단순하게 metric log를 추출 한 뒤에 이를 시계열 데이터로 출력하여도 됩니다. 이 또한 좋은 모니터링 방법이 될것입니다.  

무엇보다도!!! 다양한 오픈소스를 research 하고, 실전에 적용해보는 연습이 제일 중요합니다.

 

또한, https://github.com/datastacktv/data-engineer-roadmap 깃허브 페이지에서는 최신 트렌드의 Data Engineer 로드맵을 매년 포스팅 해주고 있습니다.

무엇을 해야할 지 모르겠다면 최신동향의 적절한 소프트웨어를 하나 골라 공부 한 뒤에,

자신의 능력와 개발능력을 총 동원하여 클러스터에 맞는 모니터링 솔루션을 개발하시길 바랍니다.

 

 

reference

https://activemq.apache.org/

 

 

1. 개요

빅데이터 클러스터에서 실시간 데이터 모니터링은 어렵습니다.

제가 운영중인 빅데이터분석플랫폼들은 엄청나게 많은 데이터를 처리하고 있습니다.
크기는 무려 페타급 규모이며 어떤 테이블은 모든 파티션의 row가 수십조개나 됩니다. 
이런 데이터들을 실시간으로 모니터링한다면 ? 생각만해도 참 숨이 막혀옵니다.

그 규모가 절대 작지 않습니다.

실시간데이터라고 규모가 작지 않습니다. 
운영중인 클러스터는 수 십 PB급 규모이며, 수십 개 종류의 데이터를 TB단위로 실시간 적재/전송/처리/제공 하고있습니다.

File System대신 memory 친화적이다.

실시간데이터는 대부분 메모리를 통해 빠른전송방식을 택하고있다는것이 일반적이죠.
메모리를 활용하는 Software들은 File System에 제한적인 정보만 저장하기 때문에,  별도로 로그를 남기지 않는 이상 모든데이터를 검사하는것은 어렵습니다.

모든 데이터를 검사 하더라도, 실시간 데이터를 검사하기 위한 리소스도 많이필요할 것입니다.

 

그래도 좋은방법은 존재한다.

하지만 흘러가는 데이터의 스냅샷 등을 주기적으로 관찰하거나, 처리중인 데이터의 추이를 활용한다면?
클러스터에 부담가지 않으면서, 인력도 크게 필요하지 않은 효율적인 모니터링이 될것입니다.

이 문서를 통해서 빅데이터클러스터에서 실시간 데이터의 효율적인 모니터링 방법을 익혀보도록 합시다.

 

2. 모니터링 실전 적용

2.1. ActiveMQ 

Apache ActiveMQ란 무엇인가?

Apache ActiveMQ는 다중프로토콜을 지원하는 Java기반의 메시지 브로커입니다. 아파치 재단의 OSS로 누구나 무료로 사용 할 수 있습니다.

ActiveMQ는 아래와 같은 특성을 가집니다.

  • Java, C, Python, php 등 다양한 언어 간 클라이언트 및 프토토콜을 지원합니다
  • JMS 1.1 및 J2EE 1.4 를 완벽하게 지원하며, 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공 합니다.
  • JMS(Java Message Service) 클라이언트와 메시지 브로커 모두에서 통합패턴에 대해 완벽히 지원합니다.

ActiveMQ의 메시지는 Producer → Broker → Consumer 구조로 처리되며,
NexR에서 운영하는 클러스터는 Broker에 Queue 모델을 채택하여  클라이언트/서버 상황에 따라 메시지를 전송합니다.

ActiveMQ 웹 UI 콘솔을 활용한 모니터링

ActiveMQ 기본설치시 간단한 Operation을 위한 web ui도 설치됩니다.

Web UI에서는 ActiveMQ를 통해 전송되는 queue 상태를 확인 할 수 있을뿐만 아니라 세부 메세지 내역도 확인이 가능합니다

 

1. ActiveMQ 웹 콘솔 메뉴구성

ActiveMQ 웹 콘솔에 (디폴트 url : localhost:8161/admin/) 접속하면 위와 같은 메뉴구성을 만날 수 있습니다.

  • Home
    Broker의 이름, 버전, memory usage, uptime 등 메세지 시스템의 기본 구성을 보여줍니다
  • Queues
    ActiveMQ를 통해 전송되는 topic들의 queue 상태를 확인 할 수 있습니다.
  • Topics
    ActiveMQ를 통해 전송되는 topic들의 다양한 operation이 가능합니다.

2. Queues 탭


Queues 탭에서는 전송되는 topic들의 queue 현황을 확인할 수 있습니다.

보시면 # of pening Message에 숫자가 꽤 쌓여있는것을 볼 수 있습니다. 영문 그대로 "전송되지 못하고 보류중인 데이터"라는 뜻입니다.
이 숫자는 Enqueued 된 메시지중 Dequeued 되지 못한 메시지 수 와 동일합니다. 

# of pening Message가 지속적으로 쌓인다면 agent가 제대로 메시지를 처리하지 못하고 있는 뜻인데요, 
일부 유실되어도 괜찮은 데이터라면 모르지만 중요한 데이터라면 유실되지 않게 메모리 증설 등 튜닝을 진행하여 줍시다.

 


우측에 보면 3가지 수행이 가능한 Operation 공간이 있습니다.

Send to는 수동으로 message를 보낼 수 있는 기능입니다.
Purge를 활용한다면 pending된 모든 메세지를 없앨 수 있습니다. 이 때 purge를 통해 없어진 데이터들은 모두 유실이 됩니다.
delete는 해당 topic을 지울때 사용합니다. 왠만하면 누를일이 없기 바랍니다.

 

3. 메시지 확인

Queues탭에서 메시지 확인을 원하는 Topic을 클릭한다면, 위 와 같이 메시지들의 정보들을 확인 가능합니다.

확인을 원하는 메시지들을 클릭하면 아래와 같이 세부정보를 확인할 수 있습니다. 

메시지의 메타정보 뿐만 아니라 detail 정보까지 확인이 가능합니다.

이렇게 빠르고 간결하고 편한데 웹콘솔을 사용하지 않을 이유가 없죠?

 

 

 

Intro

Presto는 Facebook의 넘쳐나는 페타바이트급 데이터를 효율적으로 분석하기 위해 2012년도에 kickoff 된 프로젝트 입니다.
2013년 가을 Facebook에서 apache 라이선스를 적용하여 Presto 오픈소스를 공개하였습니다. [FE 발표

즉, 기업이든 연구기관이든 큰 문제없이 무료로 사용할 수 있지요. [아파치 라이선스 2.0 FAQ  ]

 

Presto란 ?

Presto는 Facebook kickoff 프로젝트의 내용처럼 TB, PB급 데이터를 효율적으로 처리하기 위해 만들어졌습니다. 대용량의 데이터를 빠르게 추출하기 원하면 Hive 보다는 Presto를 사용하는 것이 적합합니다.

Presto의 핵심은 두가지입니다.

  • 다양한 소스 지원 - Hive 메타스토어, RDBMS, 아마존 S3, HBase 등 다양한 소스로부터 데이터를 읽어올수 있다.

  • MR보다 빠르다 - MR Job 베이스의 Hive는 중간 단계별 결과를 Disk(HDFS or Local FS)상에 저장하는데, Presto는 이를 Memory상에 저장합니다.

 

아키텍쳐

에코관점의 Presto 아키텍쳐

Presto는 하나의 Coordinator와 실제로 job을 수행하는 여러개의 Worker로 나누어집니다.
Coordinator는 HBase, Hive 등 다양한 데이터 소스등을 읽어와 worker에게 전달을 하는 인터페이스 역할을 합니다.

https://labs.gree.jp/blog/2014/12/12838/ 

 

내부 동작 과정 (Daemon)

  • Coordinator (HIVE의 리소스 매니저와 비슷)
    * Clinet와 직접적은 통신을 하는 Gateway 역할을 함.
    * Client를 통해 들어오는 쿼리를 관리하며, Workerdata 처리를 하게끔 task를 보내줌(= excute).

  • Worker (HIVE의 노드 매니저와 비슷)
    * Coordinator를 통해 받은 task를 기반으로 data source에 접근함.
    * 리턴 결과를 Coordinator가 아닌 바로 Client로 보내 줌.

 

상세 동작 과정

0.    Discovery Service : Worker 실행시 CoordinatorDiscovery Service로 리스트가 등록된다. 이 리스트를 기반으로 CoordinatorWorker에게 excute를 전달 합니다.

1.    ClientHTTP 프로토콜로 쿼리를 Coordinator Parser 에 전달 합니다.

2.    CoordinatorPlanner로 쿼리플랜을 작성 합니다.

3.    Planner가 작성되면 CoordinatorSchedulerWorker가 일을 수행하게끔 task를 전달합니다.(excute)

4.    WorkerConnector plugin을 통해 다양한 Data source로부터 데이터를 읽어옵니다. 이 작업은 Memory상에서 수행됩니다.
(
중요!) 메모리상에서 task가 진행되기 때문에 worker가 부담스러워 하지 않도록 client 딴에서 쿼리 튜닝이 필수입니다. 막 쓰다가 엔지니어와 싸움날 수 있습니다 :(

5.    Worker는 작업이 끝나면 바로 Client로 결과를 보내줍니다.

presto definitive guide storage environment 中

개요

하둡 저장소에 저장된 hive warehouse 디렉토리별 용량을 확인하기 위해 아래 명령어를 사용하였다.

hadoop fs -du -h /user/hive

하지만 아래와 같이 권한 문제로 일부 경로에 접근이 불가하였다.

du: Permission denied: user=username, access=READ_EXECUTE, inode="/user/hive/.staging":hive:hive:drwx------

 

원인

  • 대부분의 현업에서, 엔지니어의 경우 root 계정이 아닌 superuser do(sudo) 실행이 가능한 계정을 받을 것 이다.
  • 그렇기 때문에, hdfs는 user 권한에 따라 탐색이 불가능 할 수 있다.

 

해결

  • 특정유저로 명령어 실행(sudo -u username "명령어")
sudo -u hdfs hadoop fs -du -h /user/hive

ResourceManager

    • YARN 클러스터의 Master 서버로 하나 또는 이중화를 위해 두개의 서버에만 실행됨
    • 클러스터 전체의 리소스를 관리
    • YARN 클러스터의 리소스를 사용하고자 하는 다른 플랫롬으로부터 요청을 받아 리소스 할당(스케줄링)

NodeManager

    • YARN 클러스터의 Worker 서버로 ResourceManager를 제외한 모든 서버에 실행
    • 사용자가 요청한 프로그램을 실행하는 Container fork 시키고 Container를 모니터링 Container 장애 상황 또는 Container가 요청한 리소스보다 많이 사용하고 있는지 감시(요청한 리소스보다 많이 사용하면 해당 Container kill 시킴)

 

Yarn Architecture

  • 1) 리소스매니저는 글러볼 스케줄러라고 정의할 수 있다. 리소스매니저는 전체 클러스터에서 가용한 모든 시스템 자원을 관리한다. 얀 클러스터에서 실행되는 애플리케이션이 리소스를 요청하면 이를 적절하게 분배하고, 리소스 사용 상태를 모니터링한다.

  • 2) 노드매니저는 맵리듀스의 태스크트래커의 기능을 담당한다. 태스크트래커가 각 슬레이브 서버마다 하나의 데몬이 실행된 것처럼 노드매니저도 각 슬레이브에서 하나의 데몬이 실행된다. 노드매니저는 컨테이너(Container)를 실행하고, 컨테이너의 라이프 사이클을 모니터링한다.

  • 3) 컨테이너는 노드매니저가 실행되는 서버의 시스템 자원을 표현한다. CPU, 메모리, 디스크, 네트워크 같은 다양한 시스템 자원을 표현한다. 맵리듀스의 태스크트래커가 태스크 단위로 잡을 실행했다면 노드매니저는 컨테이너 단위로 애플리케이션을 실행하고 각 상태를 스케줄링한다.

  • 4) 애플리케이션마스터는 하나의 애플리케이션을 관리하는 마스터 서버다. 클라이언트가 얀에 애플리케이션 실행을 요청하면 얀은 하나의 애플리케이션에 하나의 애플리케이셔마스터를 할당한다. 예를 들어, 얀 클러스터에 하나의 맵리듀스 잡과 하나의 스톰 애플리케이션 실행을 요청했다면 두 개의 애플리케이션마스터가 실행된다. 애플리케이션마스터는 애플리케이션에 필요한 리소스를 스케줄링하고, 노드매니저에 애플리케이션이 필요한 컨테이너를 실행할 것을 요청한다.

 

+ Recent posts