2.2. kafka 모니터링

2.2.1. 토픽 스냅샷 활용

Kafka는 broker를 통해서 중재되는 pub-sub (발급-구독) 모델의 메세지 큐 입니다. 
발급자(producer)가 데이터를 보내줄때 남기는 로그를 관찰하거나, consumer를 통해 적재된 데이터를 확인하여 모니터링을 할 수 있습니다.
하지만 앞선 방법들로는 토픽의 실시간 흐름을 확인하기 어렵습니다. 
카프카내에서 흘러가는 데이터를 뜰채로 건져(스냅샷) 컬럼값을 파싱 한 후, 현재시간과 비교 한다면 아주 좋은 모니터링이 될 수 있습니다. 

 

[KAFKA TOPIC 리스트 확인 하는법]     

kafka의 모든 토픽 리스트들은 아래 명령어를 통해 확인 할 수 있습니다.

  • CLI )
    /usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181
  • 결과) 토픽 리스트 반환
    userTopic_1
    userTopic_2 

    ..
    userTopic_K

[KAFKA Console Consumer sh 을 활용하여 topic 정보 확인하는법]

kafka-console-consumer.sh은 kafka topic에 있는 데이터를 consume 가능한 쉘입니다. 
sh을 통해 커맨드를 날려도 걱정마세요! sh 이름이 consumer라고해서 실제로 소비되어 없어지는 것은 아닙니다 :P
kafka는 consumer group과 offset 을 통해 데이터가 전송되니깐요 :)
 
참고로, kafka에 저장된 데이터는 운영자가 설정한 기간만큼 저장되었다가 삭제됩니다.

아래명령어를 사용한다면 현재 카프카에 저장된 토픽 userTopic_K 정보를 확인 할 수 있습니다.
--max-messages 옵션을 사용하면 메세지 수를 조절할 수 있고,  --from-beginning 을 통해 시작 위치를 지정할 수 있습니다

  • CLI)
    /usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic userTopic_K --max-messages N (--from-beginning)



  • 결과) kafka를 통해 전송되는 data 출력
    row1, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row2, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row3, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row4, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row5, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row6, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    ..

    rowN, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM

    Processed a total of N messages
 
 
 

 

[실전 적용]

위에서 설명한 CLI sh파일을 통해 간단한 모니터링 쉘을 작성해 봅시다

vi kafka_topic_monit_test.sh

 


#topic 리스트를 배열로 받아오기
topiclist=`/usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181`
 
#consumer.sh를 사용하여 topic 데이터를 뜰채로 1개만 건지자
topic=`/usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic ${topiclist[2]} --max-messages 1 `
 
#topic 내 5번 6번 컬럼 변수저장
col5=`echo $topic | cut -d',' -f 5`
col6=`echo $topic | cut -d',' -f 6`
 
col5_date=`date -d"$col5" "+%Y-%m-%d %H:%M:%S"`
col6_send_date=`date -d"$col6" "+%Y-%m-%d %H:%M:%S"`
 
echo ${topiclist[2]} 토픽 데이터의 클러스터 적재 일자는 $col5_date 입니다
echo ${topiclist[2]} 토픽 데이터의 producer 송신 일자는 $col6_send_date 입니다
echo 현재시간은  `date  "+%Y-%m-%d %H:%M:%S"` 입니다

 

  • 실행 결과 
    아래 결과를 보면 클러스터 적재하자마자 바로 producer를 통해 kafka로 전송되었네요.
    producer 송신일자와 현재시간을 비교해보았을때, 약 카프카 내에서 약 5분의 딜레이가 나는것을 볼 수 있습니다.
$ ./kafka_topic_monit_test.sh

userTopic_K 토픽 데이터의 클러스터 적재 일자는 2021-12-14 03:04:53 입니다
userTopic_K 토픽 데이터의 producer 송신 일자는 2021-12-14 03:04:53 입니다
현재시간은 2021-12-14 03:09:13 입니다

 

2.2.2. 파티션의 오프셋 활용

kafka의 offset 정보를 활용하는것도 모니터링의 아주 좋은 예 입니다.


KT NexR의 대표적인 실시간 빅데이터 솔루션인 린스트림은 kafka 의 offset 정보를 zookeeper에 node id별로 저장하여 관리 하고있습니다.

zookeeper에는 해당 app node의 토픽 정보들이 담겨있습니다. 각 파티션별로 offset의 상태역시 확인이 가능합니다.
파티션별 토픽 offset 정보를 기록하고 일정주기로 추이를 관찰한다면, 시간별 처리량 역시 확인이 가능합니다.

아래에서 실질적 컨슈머인 spark의 node id ( app1-node-3)의 추이를 관찰해 봅시다.

 

[consumer의 offset을 확인하는 법]

  • CLI)
    /usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/app1-node-3


  • 결과) app1-node-3 의 offset 정보 출력


Connecting to localhost
WATCHER::WatchedEvent state:SyncConnected type:None path:null
5:16497745458,7:16497813566,1:16497415976,4:16497559539,8:16497739374,3:16497572711,2:16497153812,6:16497762636,0:16497555877,9:16497733103
cZxid = 0x1800e40cde
ctime = Wed Aug 18 17:48:32 KST 2021
mZxid = 0x1a000a559a
mtime = Tue Dec 14 03:34:07 KST 2021
pZxid = 0x1800e40cde
cversion = 0
dataVersion = 168546
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 139
numChildren = 0

[실전 적용]

위에서 설명한 CLI를 활용하여 일정주기 (1분단위)로 partition의 offset을 추적하는 sh을 작성하여 봅시다.

#!/bin/bash
 
function offsetCount (){
   ##  파티션:offset구조의 array를 매개변수로 넣음
   info=(${!1})
   waitInfo=(${!2})
   sum=0
 
   ## 각 파티션별 1분뒤 offset값의 차를 더해서 마지막 sum에 전달
   for (( i=0 ; i < ${#info[@]} ; i++));
   do
      val=0
      info_index=`expr index "${info[$i]}" ':'`
      waitinfo_index=`expr index "${waitInfo[$i]}" ':'`
 
      partition=${info[$i]:0:`expr $info_index - 1`}
      value=${info[$i]:$info_index}
      #echo "partition:$partition"
      #echo "value:$value"
      waitPartition=${waitInfo[$i]:0:`expr $waitinfo_index - 1`}
      waitValue=${waitInfo[$i]:$waitinfo_index}
      #echo "waitpartition:$waitPartition"
      #echo "value:$waitValue"
      if [ $partition == $waitPartition ]
      then
        val=`expr $waitValue - $value`
        sum=`expr $sum + $val`
      fi
   done
   echo $sum
 
}
 
declare -A var       ## -A 옵션은 string 타입의 인덱스 사용할수 있다.
declare -A afterVar
declare -a count     ## -a 옵션은 int 타입의 인덱스 사용할수 있다.
 
#구분자를 ,로 변경 (offset변수의 결과값에서 ,가 띄어쓰기로 변경된다.)
OLD_IFS=$IFS
IFS=,
 
appid="app1-node-3"
 
#오프셋 정보를담은 텍스트를 offset 변수에 저장
offset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
var+=( ["${appid}"]="${offset}" )
 
#다음 offset 체크를위해 60초 대기
sleep 60
 
#60초 후의 offset 정보를 변수에 저장
afterOffset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
afterVar+=( ["${appid}"]="${afterOffset}" )
 
#offset과 afterOffset의 변화량을 모두합산하여준다.
result=$(offsetCount var[${appid}][@] afterVar[${appid}][@])
count+=( "$result" )
 
echo $appid 의 1분동안 처리량 : $count
 
exit 0



출력 결과  ) 

$ ./checkOffset_test.sh
app1-node-3 의 1분동안 처리량 : 20021

20021개로 출력되는것을 볼 수 있다

 

2.2.3. Producer Agent의 Hang 체크

 

[Producer Agent는 왜 행이걸릴까?]

보통 데이터 적재가 안되면 "consumer group에서 행이걸린거겠지..?" 생각하고 consumer app과 KAFKA데몬만 재기동하는 경우가 많습니다. (사실 이 방법이 가장 직설적이며 간단한 방법이지요.)

클러스터는 상황에따라 데이터량이 갑작스럽게 증가 하는 경우가 많습니다.
예측하지 못한 상황에 의해 producer가 갑자기 많은양의 데이터를 감당하지 못하고 Hang이 걸리는 경우가 발생 할 수 있습니다. 

그렇기 때문에 kafka에 데이터를 보내 주는 producer 역시도 모니터링이 필요합니다.
데이터 전송상태를 출력하는 클래스를 Producer Agent에 구현 하는 것 도 하나의 모니터링 방법이 될 수 있습니다.

아래에서는 조금 색다른 방법의 모니터링을 보여드리겠습니다.

 

[실전 적용]

자, 여기 gc.log를 남기는 producer역할의 Agent 가 있습니다.

이 Agent는 Java기반의 프로그램이며, GC또한 존재합니다.

 

jvm 설정을 통해 출력하는 gc.log의 형태는 아래와 같습니다.

gc.log file 명 : gc.agent.log

2021-12-15T04:57:28.602+0900: 307125.450: [GC [PSYoungGen: 292374K->18933K(316928K)] 9972894K->9702027K(10058240K), 0.0203490 secs] [Times: user=0.29 sys=0.00, real=0.02 secs]
2021-12-15T04:57:34.741+0900: 307131.590: [GC [PSYoungGen: 293877K->23686K(316416K)] 9976971K->9718670K(10057728K), 0.0261380 secs] [Times: user=0.29 sys=0.02, real=0.02 secs]
2021-12-15T04:57:41.501+0900: 307138.350: [GC [PSYoungGen: 298630K->27418K(298496K)] 9993614K->9725402K(10039808K), 0.0345700 secs] [Times: user=0.27 sys=0.01, real=0.03 secs]
2021-12-15T04:57:47.930+0900: 307144.779: [GC [PSYoungGen: 298266K->27540K(294400K)] 9996250K->9728638K(10035712K), 0.0259060 secs] [Times: user=0.27 sys=0.00, real=0.03 secs]
2021-12-15T04:57:52.472+0900: 307149.320: [GC [PSYoungGen: 294292K->24043K(286720K)] 9995390K->9729680K(10028032K), 0.0279990 secs] [Times: user=0.36 sys=0.01, real=0.03 secs]
2021-12-15T04:57:58.552+0900: 307155.401: [GC [PSYoungGen: 286694K->25312K(284160K)] 9992330K->9739092K(10025472K), 0.0286700 secs] [Times: user=0.33 sys=0.01, real=0.03 secs]
2021-12-15T04:58:04.579+0900: 307161.428: [GC [PSYoungGen: 283861K->24093K(287232K)] 9997640K->9741483K(10028544K), 0.0290270 secs] [Times: user=0.34 sys=0.00, real=0.03 secs]
2021-12-15T04:58:10.363+0900: 307167.212: [GC [PSYoungGen: 279069K->23939K(275456K)] 9996459K->9744718K(10016768K), 0.0356700 secs] [Times: user=0.40 sys=0.00, real=0.04 secs]
2021-12-15T04:58:15.110+0900: 307171.958: [GC [PSYoungGen: 275331K->20312K(280064K)] 9996110K->9744154K(10021376K), 0.0312460 secs] [Times: user=0.37 sys=0.00, real=0.03 secs]
2021-12-15T04:58:19.922+0900: 307176.770: [GC [PSYoungGen: 268120K->22876K(267264K)] 9991962K->9750300K(10008576K), 0.0282790 secs] [Times: user=0.33 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.108+0900: 307181.956: [GC [PSYoungGen: 267100K->22232K(270848K)] 9994524K->9754632K(10012160K), 0.0293170 secs] [Times: user=0.36 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.137+0900: 307181.986: [Full GC [PSYoungGen: 22232K->0K(270848K)] [ParOldGen: 9732399K->9247170K(9814016K)] 9754632K->9247170K(10084864K) [PSPermGen: 49072K->49072K(262144K)], 3.3763050 secs] [Times: user=58.53 sys=0.00, real=3.37 secs]

 

이 Agent를 통해 발생하는 gc.log 활용 하여  java old영역의 점유율을 출력하는 sh을 작성하도록 하겠습니다.

vi gc_check.sh
#!/bin/bash
GC_LOG_PATH=/home/user/app/producer-agent/log
 
#GC 파일들을 읽어온다
GC_FILE=`ls -alrt $GC_LOG_PATH | grep gc.agent.log |  awk '{print $8}'`
 
#GC파일의 line을 변수에 저장. for 문으로 활용하시면 좋습니다
line=`tail $GC_LOG_PATH/${GC_FILE[0]} -n1`
 
#NUM1과 NUM2를 정규식으로 파싱하여 old영역 계산
NUM1=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $1}')
NUM2=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $2}')
((NUM2=${NUM2}*100))
((NUM1=${NUM2}/${NUM1}))
 
echo gc log의 old 영역은 $NUM1% 입니다.


출력 결과)

$ ./gc_check.sh
gc log의 old 영역은 95% 입니다.

이렇게 gc.log에서 old영역을 추출함으로서 Producer Agent의  Hang을 체크해 볼 수 있습니다.

GC가 해소되지 않는다는것은 Agent에 악영향을 끼치고 있다는 뜻입니다.

위 sh파일을 활용하여 추이를 관찰하고, 일정주기별로 재기동 하여준다면 아주 좋은 모니터링의 방법이 될 수 있습니다.


 

3. 결론 

최적의 실시간 데이터 모니터링 기법을 적용하여 장애나 데이터 누락을 최소화 하는 것은 Data Engineer의 필수요건입니다.

 

데이터 모니터링에는 정답이 없습니다.

위에서 작성했던 내용처럼 ActiveMQ의 웹콘솔과 CLI를 활용하여도 되고, GC log를 캐치하여 agent 행을 체크해도 됩니다.

만약 위의 방법이 어렵다면 단순하게 metric log를 추출 한 뒤에 이를 시계열 데이터로 출력하여도 됩니다. 이 또한 좋은 모니터링 방법이 될것입니다.  

무엇보다도!!! 다양한 오픈소스를 research 하고, 실전에 적용해보는 연습이 제일 중요합니다.

 

또한, https://github.com/datastacktv/data-engineer-roadmap 깃허브 페이지에서는 최신 트렌드의 Data Engineer 로드맵을 매년 포스팅 해주고 있습니다.

무엇을 해야할 지 모르겠다면 최신동향의 적절한 소프트웨어를 하나 골라 공부 한 뒤에,

자신의 능력와 개발능력을 총 동원하여 클러스터에 맞는 모니터링 솔루션을 개발하시길 바랍니다.

 

 

reference

https://activemq.apache.org/

 

 

+ Recent posts