개요

gitlab이 보안에 취약하다는 메일을 받았다.

대충 gitlab을 삭제하거나 버전을 바꾸라는 내용이다.
추후 git을 활용할 예정이므로 gitlab을 삭제하지 않고 다른버전으로 패치할 것이다.

버전 변경 : 11.9.8 → 11.8.10

참고자료 : https://docs.gitlab.com/ee/update/package/downgrade.html

 

작업

 

1) 설치에 필요한 rpm 파일 옮기기

 

파일 다운로드

필요한 rpm 파일은 https://packages.gitlab.com/gitlab/gitlab-ce 페이지에서 다운가능하다.

쭉 훑어보면 알겠지만 centos6 ( = el6 ) 에서는 13.8.8 이상의 버전으로 업그레이드가 불가능합니다. ( 13.8.8이상은 el7부터 지원함 !! )

 

그렇기 때문에 취약제외 버전중 가장 높은 버전인 11.8.10으로 설치 해줍니다.

11.8.10 설치 경로 : https://packages.gitlab.com/gitlab/gitlab-ce/packages/el/6/gitlab-ce-11.8.10-ce.0.el6.x86_64.rpm

 

파일 옮기기

SFTP를 활용하여 Repository 서버로 gitlab-ce-11.8.10-ce.0.el6.x86_64.rpm 파일을 옮겨 줍시다.

 

REPO 구성하기 (Repo Server)

repo.host.com 서버는 클러스터에서 사용하는 대표 repo 서버입니다.

설치한 rpm을 대상 http 경로로 카피 후, repo를 새로 업데이트해줍니다.

admin@repo.host.com
 
sudo cp ~/henry/gitlab-ce-11.8.10-ce.0.el6.x86_64.rpm /var/www/html/localrepo/git/
 
cd /var/www/html/localrepo/git/
 
sudo rm -r ./repodata
 
sudo createrepo /var/www/html/localrepo/git/

 

YUM을 통한 재설치 (git server)

git.host.com 서버는 클러스터에서 사용하는 git 서버입니다.

admin@git.host.com
 
# yum 캐시 / 레포 초기화
sudo yum clean all
 
# yum repo리스트 다시 불러오기
sudo yum repolist all
 
# yum repo에서 rpm 설치가 가능한지 확인
sudo yum --showduplicates list gitlab-ce
...
...
버전2개 떠야함!
...
...
 
# 현재 설치된 gitlab (상위버전) 삭제
sudo yum remove gitlab-ce
 
# gitlab 재설치 ( 하위버전 )
sudo yum -y install gitlab-ce-11.8.10-ce.0.el6

 

결과 확인

 

소켓통신이란?

Network Layer 에서 Application Layer(응용계층)과 Transport Layer(전송계층) 사이에 존재하는 인터페이스입니다.

소켓을 사용하면 Application Layer에서 Network를 통해 다른 End Point와 통신을 할 수 있습니다.

소켓은 하위 Layer의 이해없이도 Application 계층에서 프로그래밍이 가능하다는 장점이 있습니다. 

 

소켓통신의 essential type

SOCK_STREAM -  채택한 방식

  • a.k.a TCP
  • 안정적인 전송
  • 연결 순서를 보장 해줌
  • 연결 지향적 (server ↔ client 1:1 연결)
  • 채팅, 이메일, HTTP 통신 등 에 사용

SOCK_DGRAN

  • a.k.a UDP
  • 전송이 안정적이지 않음
  • 연결 순서를 보장해주지 않음
  • 패킷이 연결상대(목적지 주소)에 대한 정보 없이 전달 
  • send와 동시에 receive가 가능
  • 동영상스트리밍 서비스, 실시간 온라인게임에 사용

 

소켓 통신 FLOW

TCP 소켓통신은 Server와 client의 end to end 통신이며, Application의 라이브러리에 있는 간단한 메소드로 소켓통신이 가능하다 .

  • socket()
    소켓통신을 위한 소켓 생성.
    소켓생성 직후에는 다른 End point와 바로 통신 할 수 없으며, 리턴타입은 File 타입.

  • connet()
    client에서 server와 연결하기 위해 사용하는 함수.

  • bind()
    다른 소켓을 받아들일 ip와 port, 그리고 서버쪽 socket을 지정해준다.
    통신을 위한 socket(file)에 ip, port를 지정하여 하나의 프로세스로 묶어준다.

  • listen()
    연결 대기중인 client를 관리하는 Queue.
    TCP Socket 통신은 1:1연결이기에 listen() 을 통해서 대기열을 관리한다.

  • accept()
    listen에서 대기중인 클라이언트와 accept를 통해 연결 수용.

  • read/recv(), write/send()
    연결된 상대와 주고받고자 하는 buffer데이터를 송수신 하는 함수.
    server가 client로 부터 close() 수신시 EOF. 

개발 요구사항

  • 전용 인터넷 회선과 보안 VPN을 통해 데이터를 전송하고, 통신방식은 TCP/IP Socket을 사용.

  • 모든 메세지 처리에 대한 응답은 상호 소켓 연결 후 즉시 응답.
    • 응답시간이 N초를 초과할 경우 M회 재시도
    • 그래도 응답이 없을 경우 실패로 간주하여 연결 종료

  •  다수의 발송이 필요할 경우 멀티쓰레드로 구현하며, 멀티쓰레드 최대 N개를 넘지 않는다.

  • 상호 데이터 송수신은 정해진 규격에 따라 진행한다.

동작 설계

  • 동작하는 데몬은 송신용 서버와 수신용 클라이언트로 구성한다.

  • ConfigParser를 사용하여 설정파일(conf.properties)을 읽어온다.

  • logging / handler 모듈을 통해 로그 관리를 한다.

  •  Class를 활용하여 송수신 할 Data Object 정의한다.
    이렇게 하면 추 후 데이터 송수신 규격이 바뀌어도 관리가 용이하다.

  • 소켓 생성 시 멀티스레드를 통해 병렬처리가 가능하게끔 한다.

  • env와 startup을 구성하여 운영할 때 용이하게 한다.

 

스켈레톤

수신용 서버

수신용 데몬 - recv_server.py
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에서 보내주는 데이터 소켓통신 수신용 서버
        * 동작 과정
            1. Remote로부터 패킷수신 하기 위해 socket bind (recv_server.py)
            2. Remote로부터 패킷수신
            ...
            ...
            3.2 패킷수신 실패시, Remote server에서 별도처리
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가   
        ...                          
'''
 
import socket
import threading
import time
import os
import datetime
import ConfigParser
 
import logging
from logging import handlers
 
def read_config() :
    config = {}
    config = ConfigParser.ConfigParser()
    config.read('/file/to/path/conf.properties')
 
    '''
    conf값으로 불러올 변수 선언
    '''
 
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
     
    '''
    Log 포맷과 log handler 작성 후 logging 객체에 부착
    '''
 
class clustertoremote_obj :
     
    #data_to_Remote : list
    def __init__(self, data_to_Remote) :
        self.a = a
        self.b = B
        '''
        Data Object 구성
        '''
 
class remotetokcluster_obj :
 
    #data_from_Remote : list (len : 9)
    def __init__(self, data_from_Remote) :
        self.a = data_from_Remote[0]
        self.b = data_from_Remote[1]
        '''
        Data Object 구성
        '''
 
def do_to_str(Array):
    s = '|'.join(Array)  
    return s
 
def file_write(remoteDo):
 
    file_path = REMOTE_DATA_DIR
    file_name = datetime.datetime.today().strftime('%Y%m%d%H%M') + '_request.txt'
    save_as = file_path+file_name
 
    f=open(save_as,'a')
    f.write(remotedo_to_str(remoteDo)+'\n')
    f.close()
    '''
    Return : True/False
    '''
 
def check_parameter(data):
    myLogger.info(' run parameter check module')
 
    #local variable init
    length='0000'
    val2='REQ'
    val3='000000000'
 
 
    # data의 정합성 체크
        '''
        수신받은 데이터 체크, ERROR 확인 및 분기처리
        Return : 수신받은 데이터의 DO
        '''
    #setting clusterDo
    ...
    clusterDo.valN = 'XXX'
 
    return do_to_str(clusterDo)
 
def handler(client_socket, addr, data) :
    myLogger.info(' run client handler')
 
    '''
    생성된 소켓을 실제로 처리하는 컨트롤러/핸들러 작성
    '''
    client_socket.sendall(return_msg)
 
def server_req(client_socket, addr) :
    myLogger.info('=====================================')
    myLogger.info(' The thread created successfully. The socket connected from %s', addr)
    '''
    소켓 생성후 연결되었을때, 처리하는 함수 작성
    '''
    # 작업 후 소켓 close
    client_socket.close()
    myLogger.info(' socket closed from client ')
    myLogger.info('=====================================')
 
def prepare_socket_recv() :
    global server_socket
 
    #create socket object
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #error handle
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
    #bind server
    server_socket.bind((HOST,PORT))
 
    '''
    소켓 생성 및 스레드 처리.
    큐 관리.
    '''
 
if __name__ == "__main__" :
    myLogger.info(' RECV SERVER DAEMON START')
    prepare_socket_recv()
    server_socket.close()

 

송신용 클라이언트

송신용 데몬 - send_client.py
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에게 소켓통신 송신용 클라이언트
        * 동작 과정
            1. Remote로 부터 패킷전송 socket 생성 (send_client.py)
            2. Remote와 부터 패킷연결
            ...
            ...
            3.2 패킷연결 실패시, 해당 파일 재시도
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가
        * 2021MMDD [v.1.x.x]: send_file() / file 체크 로직 변경
        ...
'''
import socket
import threading
import time
import os
import datetime
import io
 
import shutil
import ConfigParser
 
import logging
from logging import handlers
 
def read_config() :
    config = {}
    config = ConfigParser.ConfigParser()
    config.read('/file/to/path/conf.properties')
     
    '''
    conf값으로 불러올 변수 선언
    '''
 
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
     
    '''
    Log 포맷과 log handler 작성 후 logging 객체에 부착
    '''
 
 
def get_text(file_full_path):
    send_data = f.readline().replace('\n','')
    return combine_text(send_data)
 
def client_send(file_full_path):
 
    myLogger.info(' read file from.. %s ', file_full_path)
    send_data = get_text(file_full_path)
 
    myLogger.info(' Ready to send data %s', send_data)
    #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    try :
        '''
        소켓 송신을 위한 내용 작성
        '''
 
    except Exception as e :
            '''
            예외처리 항목 설정
            '''
 
def prepare_socket_send(file_full_path) :
    myLogger.info(' the socket created')
    global to_remote_client_socket
    to_remote_client_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    while not conn:
        try :
             '''
            소켓 송신을 위한 내용 작성
            '''
        except Exception as e :
            myLogger.error(' connetion error %s ', e)
            '''
            예외처리 항목 설정
            '''
 
    to_remote_client_socket.close()
 
def send_file():
    isNewFile=False
    myLogger.info(' SEND CLIENT DAEMON START')
    while True :
        file_list= os.listdir(remote_RESULT_DIR)
        if len(file_list) > 0 :
            isNewFile = True
        else :
            isNewFile = False
 
 
        if isNewFile :
            '''
            무한루프 돌면서, 송수신 환경이 충족될 경우 소켓송수신 수행
            '''
 
        isNewFile=False
        #file 없을경우 쉼
        time.sleep(FILE_FOUND_SLEEP_TIME)
 
 
def run():
    send_file()
 
if __name__ == "__main__" :
    run()

 

메소드 설명

  • AUTHOR
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에서 보내주는 데이터 소켓통신 수신용 서버
        * 동작 과정
            1. Remote로부터 패킷수신 하기 위해 socket bind (recv_server.py)
            2. Remote로부터 패킷수신
            ...
            ...
            3.2 패킷수신 실패시, Remote server에서 별도처리
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가   
        ...                          
'''

형상관리 툴을 사용할 수 없는 환경에서 소스코드에대한 정보를 기입하는, 일종의 Abstract 같은 존재입니다.

AUTHOR, Date Created, Last Modified 등 기본정보를 적어주고

형상 변환에 따라 버전정보와 수정내역, 기본적인 소스구동원리(Description)을 작성하여줍니다. 

 

  • def read_logging()
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
 
    myLogFormatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s')
    myLogHandler = handlers.TimedRotatingFileHandler(filename=LOG_DIR+LOG_FILENAME, when='midnight', interval=1, encoding='utf-8')
    myLogHandler.setFormatter(myLogFormatter)
    myLogHandler.suffix = "%Y%m%d"
 
    myLogger = logging.getLogger('server')
    myLogger.setLevel(logging.INFO) # Logging level
    myLogger.addHandler(myLogHandler) # Log 모듈에 myLogHandler 부착

myLogFormatter 는 logging 모듈에서 사용할 포맷을 지정한다. 시간, 수행한 py 스크립트명, 발생위치, 메세지등 형식 지정 가능. 

myLogHandler는 Log 저장경로와, 인코딩 설정, 자정기준으로 새로운파일 생성여부를 지정하여준다.

myLogHandler의 포맷에 myLogFormatter  장착하여 주고 myLogger 에 기본 logging 레벨을 지정한뒤 myLogHandler를 부착하여 준다.

 

  • def do_to_str(Array)
def do_to_str(Array):
    s = '|'.join(Array)  
    return s

입력받은 Array를 구분자 | 로 구분된 텍스트로 변환해주는 함수.

 

  • def file_write(remoteDo)
def file_write(remoteDo):
 
    file_path = REMOTE_DATA_DIR
    file_name = datetime.datetime.today().strftime('%Y%m%d%H%M') + '_request.txt'
    save_as = file_path+file_name
 
    f=open(save_as,'a')
    f.write(remotedo_to_str(remoteDo)+'\n')
    f.close()
 
    if os.path.isfile(save_as):
        myLogger.info(' the file saved success as %s', save_as)
        return True
    else :
        myLogger.error(' can not create %s', save_as)
        return False

Remote서버로부터 소켓으로 전달받은 데이터는 후행 W/F를 위해 txt 파일 형태로 저장해야합니다.

파이썬에 내장된 open함수를 이용해 로컬 FS에 저장 한 후, isfile()을 사용하여 파일 저장여부를 반환합니다.  

 

  • def check_parameter(data)
def check_parameter(data):
    myLogger.info(' run parameter check module')
 
    #local variable init
    length='0000'
    val2='REQ'
    val3='000000000'
 
    finalVal='\n'
 
 
    clusterDo = clustertoRemote_req_obj([length,val2,val3])
 
    numofpipe=data.count('|')
 
    # data의 정합성 체크
    if numofpipe == NUM_OF_COLUMN-1 :
        messageValue ='200'
        data_list = data.split('|')
        remoteDo = remotetocluster_obj(data_list)
        remoteProcessNo = remoteDo.remoteProcessNo
 
        #check request type 400
        if remoteDo.requestType != 'REQ' :
            myLogger.warn(' requestType :  %s' , remoteDo.requestType)
            messageValue = '400'
 
        '''
        수신받은 데이터 체크, ERROR 확인 및 분기처리
        '''
 
 
    #setting clusterDo
    clusterDo.length = '%04d'%len('|'.join([length,val1,val2,...valN])+finalVal)
    clusterDo.requestType = 'XXX'
    ...
    clusterDo.valN = 'XXX'
 
    return do_to_str(clusterDo)

check_parameter는 Remote ↔ 클러스터간 규격서에 어긋나는 데이터형식이 들어올경우 분기처리를 해주는 함수입니다.

messageValue 변수는 클러스터의 정상수신여부/오류정보등을 담고있는 응답코드값입니다.

리턴타입은 클러스터에서 Remote로 보내기 위한 Data Object 입니다.

 

  • def handler(client_socket, addr, data)
def handler(client_socket, addr, data) :
    myLogger.info(' run client handler')
 
    return_msg = check_parameter(data)
     #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    return_msg = return_msg.encode('utf-8')
 
    myLogger.info(' cluster to remote send msg :  %s' , return_msg)
    client_socket.sendall(return_msg)

socket data를 실제 send하는 함수입니다.

아래에서 설명 할 server_req의 종속함수 이며, recv_server.py 함수 호출의 End Point입니다.

 

  • def server_req(client_socket, addr)
def server_req(client_socket, addr) :
    myLogger.info('=====================================')
    myLogger.info(' The thread created successfully. The socket connected from %s', addr)
    data = client_socket.recv(1024)
    data = data.decode('utf-8')
    myLogger.info(' data recieve success. DATA : %s', data.encode('utf-8'))
 
    data = data.replace('\n','') #\n 문자 제거(cluster Daemon에서 필요 없는 parameter)
    myLogger.info(' remove \n for inner process logic. DATA : %s', data.encode('utf-8'))
 
    if len(data) == 0 :
        myLogger.warn(' Data from client is Empty')
    #socket handler
    handler(client_socket, addr, data)
 
    client_socket.close()
    myLogger.info(' socket closed from client ')
    myLogger.info('=====================================')

Remote서버로 전송할 socket의 환경설정을 담당하며, 소켓방식으로 데이터를 send하는 handler()함수를 실행합니다

아래에 설명할 prepare_socket_recv() 함수의 종속 함수이며, server_req()는 스레드로 생성되어 병렬처리됩니다

 

  • def prepare_socket_recv()
def prepare_socket_recv() :
    global server_socket
 
    #create socket object
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #error handle
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
    #bind server
    server_socket.bind((HOST,PORT))
 
    myLogger.info('socket bind done %s' ,server_socket)
    server_socket.listen(MAX_CLIENT_NUM)
    myLogger.info('socket wait listen %s' ,server_socket)
    while True :
        try :
            client_socket, addr = server_socket.accept()
            th_server_req = threading.Thread(target=server_req, args =(client_socket, addr))
            th_server_req.daemon=True
            myLogger.info('The thread allocated for server request from remote client. thread-id : %s', th_server_req)
            th_server_req.start()
 
        except KeyboardInterrupt as k :
            myLogger.error(' binded server socket closed by Interrupt %s', k)
            server_socket.close()
            break
 
        except Exception as e :
            myLogger.error(' connetion %s', e)
            server_socket.close()
            break

socket.socket()으로 소켓을 생성하고 .bind()로 실제 송신할 준비를 합니다.

통신 규격 조건에 맞게 listen()을 사용하여 소켓이 대기할 Queue사이즈를 지정하여 줍니다

 threading.Thread를 통해 멀티스레드를 실행하며, 소켓 통신을 위한 server_req 를 스레드로 실행합니다.

이때 try/exception 예외처리를 통해 소켓통신의 온전한 수행을 보장합니다.

 

 

 

 

  • def client_send(file_full_path)
def client_send(file_full_path):
 
    send_data = get_text(file_full_path)
 
    #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    try :
        to_remote_client_socket.sendall(send_data.encode('utf-8'))
        #응답데이터 수신
        recv_data = to_remote_client_socket.recv(1024) #byte(utf-8)
 
        #성공시
        if recv_data.split('|')[-1] == '200' :
            remove_file(file_full_path)
 
        #수신안되면 종료하기
        else :
            failed_file(file_full_path)
 
    except Exception as e :
            myLogger.error(' to remote socket send error %s ', e)

client_send()은 송신용 데몬에서 소켓을 발송하는 함수입니다.

인코딩, 데이터의 크기 등 소켓의 기본환경을 설정하여주고, 로컬에서 생성된 data 가 발송준비가 되었는지 최종 체크를 진행합니다.

보낼 data에 이상이 없으면 소켓을  통해 보내주며, 발송 성공/실패에 따라 로컬파일 분기처리를 합니다.

아래 설명할 prepare_socket_send() 함수의 종속함수 입니다.

 

  • def prepare_socket_send(file_full_path) 
def prepare_socket_send(file_full_path) :
    global to_remote_client_socket
    to_remote_client_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    conn=False
    conn_cnt=0
    while not conn:
        try :
            to_remote_client_socket.connect((remote_HOST,PORT))
 
            th_client_send = threading.Thread(target=client_send, args=(file_full_path, ))
            th_client_send.daemon=True
            myLogger.info(' The thread allocated for client send to remote server. thread-id : %s',th_client_send)
            th_client_send.start()
            conn = True
        except Exception as e :
            '''
                예외처리 항목 설정
            '''
 
    to_remote_client_socket.close()

prepare_socket_send() 함수는  client_send()의 스레드를 생성하여 송신용 소켓의 관리를 하는 함수입니다.

상대측과  데이터를 받을 준비가 되었는지 체크를 하며, 정상 연결 되었을경우 하위 스레드를 생성합니다

실제 소켓 송신은  client_send()에서 수행합니다. 이 때 발송할 데이터의 경로를 받아와(file_full_path) 파라미터로 넘겨줍니다.

 

  • def send_file()
def send_file():
    while True :
        file_list= os.listdir(remote_RESULT_DIR)
        if len(file_list) > 0 :
            myLogger.info('=====================================')
            myLogger.info(' File Founded! : %s ',file_list)
            isNewFile = True
        else :
            isNewFile = False
 
 
        if isNewFile :
            #가장 오래된 파일 send
            file_name = os.listdir(remote_RESULT_DIR)[-1]
            file_full_path = remote_RESULT_DIR+file_name
            prepare_socket_send(file_full_path)
 
            file_list= os.listdir(remote_RESULT_DIR)
            if len(file_list) > 0 :
                continue
 
        isNewFile=False

송신용 데몬은 무한루프를 돌면서 remote 서버와 끊임없이 연결을 시도합니다.

단, 네트워크트래픽 부하를 방지하기위해 규격서에 정의된 sleep 타입이 존재합니다.

기본적으로 로컬FS에 저장된 텍스트파일(전송을 위해 분석이 끝난 파일)이 존재하는지도 같이 체크합니다.

상대측 서버와 connection을 맺고 로컬파일이 준비 되었다면 prepare_socket_send() 소켓 송신모듈을 호출합니다

 

참고자료

 

https://docs.python.org/3/howto/sockets.html

http://www.kocw.net/home/search/kemView.do?kemId=1169634

https://codezone4.wordpress.com/2013/02/24/php-socket-programming-basics/

개요

PXC ?

Percona Xtradb는 mysql 엔진인 innodb에 Galera패치 등을 적용한 DB엔진입니다. 
Percoan Xtradb Cluster는 Active/active의 고가용성을 지원하는 솔루션이며 클러스터를 구축하기 위한 핵심 기술인 Codership Galera 라이브러리를 포함하고 있습니다.
Percona의 동작방식을 이해하기 위해선 Galera 클러스터에 대한 이해가 필요합니다.

http://galeracluster.com/에서 제공되는 오픈소스로 Synchronous replication를 지원하는데,
Synchronous replication를 지원하기 위해서는 Galera cluster Architecture에 대한 이해가 필요하며, Galera는 아래의 그림처럼 Certification Based Replication방식으로 동기화 처리를 합니다.

이미지 참고 - http://galeracluster.com/documentation-webpages/galera-documentation.pdf

 

반면에 Maria DB의 경우에는 Replication시 bin 로그 포지션 추적방식을 기본으로 사용합니다.
 bin 로그 추적방식 : [2022.05.02 - [System Engineering] - [Maria DB] Master - Slave Replication 적용.]

PXC는 위 이미지에서 볼 수 있듯, 트랜젝션 발생시 Synchronous replication도 동시에 일어납니다.

 

따라서 PXC는 binlog를 지우더라도 sync에는 전혀 영향이 없는것을 알 수 있습니다.

 

원인

binlog 저장기간 설정값인  expire_logs_days 가 99로 설정되어있는데도 불구하고 bin로그가 무기한 저장되고 있었습니다.

100이상으로 설정하면 '1292 code Warning (형식 오류)' 이 발생합니다.

해결

설정된값 확인

우선 variables 목록을 보는 mysql문으로 expire_logs_days에 설정된 값을 확인하여줍니다.

expire_logs_days값이 99로 설정되어있는걸 확인할 수 있습니다.

 

 

variable 도큐 확인


이미지 참고 - https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html

mysql 5.6 document manual에서 expire-logs-days의 설명을보면 지정가능한 최대값이 99이고 기본값은 0 ㅡ무기한저장, no automatic removalㅡ  입니다.

그렇기때문에 100이상은 설정이 불가능하였습니다. (99로 설정되어있던 mysql은 왜 무기한 저장을 하고있었는지 모르겠습니다...버그인가?)

 

MySQL 적용

1. 재기동을 하지않기 글로벌 변수를 cli통해 변경하여 줍니다.

2. 영구 적용을 위해 mycnf도 설정을 변경하여 줍니다
vi /etc/my.cnf

[mysqld]
...
...
expire_logs_days=90

3. 결과 확인
일정 시간이 흐른 후 기존에 있던 binlog들이 사라지고 저장기간이 90일로 설정된것을 확인 할 수 있습니다 (캡처날짜 : 03.29 )

reference 
https://library.gabia.com/contents/infrahosting/2317/

Presto

 

*  Jvm.config

                     -Xmx448G

*  Config.properties

                     query.max-memory= 10752GB 

                     # value is set more than 42% of physical memory (448 * 0.42 = 188)

                     query.max-memory-per-node= 188GB

                     # value of this parameter should be greater than query.max-memory-per-node (448 * 0.50 = 224)

                     query.max-total-memory-per-node= 224GB

 

각 설정값 설명 :  

*  Jvm.config

-Xmx : 코디네이터/ 워커가 jvm에 올리는 최대 힙 메모리입니다. 아래설정값들은 Xmx 값을 기준으로 설정되어야합니다. 서버의 리소스가 많이부족하면 Xmx를 낮추고, 성능향상을 위해서는 Xmx값을 상승시켜 줍니다.

*  Config.properties

query.max-memory : 프레스토 상에서 구동되는 모든 쿼리의 메모리의 최대값입니다. 아래  query.max-total-memory-per-node 설정값 * 총 워커노드수로 계산하면 됩니다.

 query.max-memory-per-node :  워커노드에 단일 쿼리당 최대로 할당할 수 있는 메모리 값입니다. 아래 query.max-total-memory-per-node 의 42%로 지정하시면 됩니다.

query.max-total-memory-per-node= 프레스토 워커가 쿼리구동을 위해 최대로 가져갈 수 있는 메모리의 최대값입니다.Xmx의 50%수준이 적당합니다. 이 파라미터는   query.max-memory-per-node 값 보다 무조건 높아야합니다

2.2. kafka 모니터링

2.2.1. 토픽 스냅샷 활용

Kafka는 broker를 통해서 중재되는 pub-sub (발급-구독) 모델의 메세지 큐 입니다. 
발급자(producer)가 데이터를 보내줄때 남기는 로그를 관찰하거나, consumer를 통해 적재된 데이터를 확인하여 모니터링을 할 수 있습니다.
하지만 앞선 방법들로는 토픽의 실시간 흐름을 확인하기 어렵습니다. 
카프카내에서 흘러가는 데이터를 뜰채로 건져(스냅샷) 컬럼값을 파싱 한 후, 현재시간과 비교 한다면 아주 좋은 모니터링이 될 수 있습니다. 

 

[KAFKA TOPIC 리스트 확인 하는법]     

kafka의 모든 토픽 리스트들은 아래 명령어를 통해 확인 할 수 있습니다.

  • CLI )
    /usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181
  • 결과) 토픽 리스트 반환
    userTopic_1
    userTopic_2 

    ..
    userTopic_K

[KAFKA Console Consumer sh 을 활용하여 topic 정보 확인하는법]

kafka-console-consumer.sh은 kafka topic에 있는 데이터를 consume 가능한 쉘입니다. 
sh을 통해 커맨드를 날려도 걱정마세요! sh 이름이 consumer라고해서 실제로 소비되어 없어지는 것은 아닙니다 :P
kafka는 consumer group과 offset 을 통해 데이터가 전송되니깐요 :)
 
참고로, kafka에 저장된 데이터는 운영자가 설정한 기간만큼 저장되었다가 삭제됩니다.

아래명령어를 사용한다면 현재 카프카에 저장된 토픽 userTopic_K 정보를 확인 할 수 있습니다.
--max-messages 옵션을 사용하면 메세지 수를 조절할 수 있고,  --from-beginning 을 통해 시작 위치를 지정할 수 있습니다

  • CLI)
    /usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic userTopic_K --max-messages N (--from-beginning)



  • 결과) kafka를 통해 전송되는 data 출력
    row1, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row2, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row3, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row4, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row5, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    row6, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM
    ..

    rowN, col1, col2, col3, col4, col5(date), col6(send_date), col7, colM

    Processed a total of N messages
 
 
 

 

[실전 적용]

위에서 설명한 CLI sh파일을 통해 간단한 모니터링 쉘을 작성해 봅시다

vi kafka_topic_monit_test.sh

 


#topic 리스트를 배열로 받아오기
topiclist=`/usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181`
 
#consumer.sh를 사용하여 topic 데이터를 뜰채로 1개만 건지자
topic=`/usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1.localhost.com:2181,zookeeper2.localhost.com:2181,zookeeper3.localhost.com:2181 --topic ${topiclist[2]} --max-messages 1 `
 
#topic 내 5번 6번 컬럼 변수저장
col5=`echo $topic | cut -d',' -f 5`
col6=`echo $topic | cut -d',' -f 6`
 
col5_date=`date -d"$col5" "+%Y-%m-%d %H:%M:%S"`
col6_send_date=`date -d"$col6" "+%Y-%m-%d %H:%M:%S"`
 
echo ${topiclist[2]} 토픽 데이터의 클러스터 적재 일자는 $col5_date 입니다
echo ${topiclist[2]} 토픽 데이터의 producer 송신 일자는 $col6_send_date 입니다
echo 현재시간은  `date  "+%Y-%m-%d %H:%M:%S"` 입니다

 

  • 실행 결과 
    아래 결과를 보면 클러스터 적재하자마자 바로 producer를 통해 kafka로 전송되었네요.
    producer 송신일자와 현재시간을 비교해보았을때, 약 카프카 내에서 약 5분의 딜레이가 나는것을 볼 수 있습니다.
$ ./kafka_topic_monit_test.sh

userTopic_K 토픽 데이터의 클러스터 적재 일자는 2021-12-14 03:04:53 입니다
userTopic_K 토픽 데이터의 producer 송신 일자는 2021-12-14 03:04:53 입니다
현재시간은 2021-12-14 03:09:13 입니다

 

2.2.2. 파티션의 오프셋 활용

kafka의 offset 정보를 활용하는것도 모니터링의 아주 좋은 예 입니다.


KT NexR의 대표적인 실시간 빅데이터 솔루션인 린스트림은 kafka 의 offset 정보를 zookeeper에 node id별로 저장하여 관리 하고있습니다.

zookeeper에는 해당 app node의 토픽 정보들이 담겨있습니다. 각 파티션별로 offset의 상태역시 확인이 가능합니다.
파티션별 토픽 offset 정보를 기록하고 일정주기로 추이를 관찰한다면, 시간별 처리량 역시 확인이 가능합니다.

아래에서 실질적 컨슈머인 spark의 node id ( app1-node-3)의 추이를 관찰해 봅시다.

 

[consumer의 offset을 확인하는 법]

  • CLI)
    /usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/app1-node-3


  • 결과) app1-node-3 의 offset 정보 출력


Connecting to localhost
WATCHER::WatchedEvent state:SyncConnected type:None path:null
5:16497745458,7:16497813566,1:16497415976,4:16497559539,8:16497739374,3:16497572711,2:16497153812,6:16497762636,0:16497555877,9:16497733103
cZxid = 0x1800e40cde
ctime = Wed Aug 18 17:48:32 KST 2021
mZxid = 0x1a000a559a
mtime = Tue Dec 14 03:34:07 KST 2021
pZxid = 0x1800e40cde
cversion = 0
dataVersion = 168546
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 139
numChildren = 0

[실전 적용]

위에서 설명한 CLI를 활용하여 일정주기 (1분단위)로 partition의 offset을 추적하는 sh을 작성하여 봅시다.

#!/bin/bash
 
function offsetCount (){
   ##  파티션:offset구조의 array를 매개변수로 넣음
   info=(${!1})
   waitInfo=(${!2})
   sum=0
 
   ## 각 파티션별 1분뒤 offset값의 차를 더해서 마지막 sum에 전달
   for (( i=0 ; i < ${#info[@]} ; i++));
   do
      val=0
      info_index=`expr index "${info[$i]}" ':'`
      waitinfo_index=`expr index "${waitInfo[$i]}" ':'`
 
      partition=${info[$i]:0:`expr $info_index - 1`}
      value=${info[$i]:$info_index}
      #echo "partition:$partition"
      #echo "value:$value"
      waitPartition=${waitInfo[$i]:0:`expr $waitinfo_index - 1`}
      waitValue=${waitInfo[$i]:$waitinfo_index}
      #echo "waitpartition:$waitPartition"
      #echo "value:$waitValue"
      if [ $partition == $waitPartition ]
      then
        val=`expr $waitValue - $value`
        sum=`expr $sum + $val`
      fi
   done
   echo $sum
 
}
 
declare -A var       ## -A 옵션은 string 타입의 인덱스 사용할수 있다.
declare -A afterVar
declare -a count     ## -a 옵션은 int 타입의 인덱스 사용할수 있다.
 
#구분자를 ,로 변경 (offset변수의 결과값에서 ,가 띄어쓰기로 변경된다.)
OLD_IFS=$IFS
IFS=,
 
appid="app1-node-3"
 
#오프셋 정보를담은 텍스트를 offset 변수에 저장
offset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
var+=( ["${appid}"]="${offset}" )
 
#다음 offset 체크를위해 60초 대기
sleep 60
 
#60초 후의 offset 정보를 변수에 저장
afterOffset=`/usr/lib/zookeeper/bin/zkCli.sh -server localhost get /lean/app/offsets/${appid} | sed -n '6p'`
afterVar+=( ["${appid}"]="${afterOffset}" )
 
#offset과 afterOffset의 변화량을 모두합산하여준다.
result=$(offsetCount var[${appid}][@] afterVar[${appid}][@])
count+=( "$result" )
 
echo $appid 의 1분동안 처리량 : $count
 
exit 0



출력 결과  ) 

$ ./checkOffset_test.sh
app1-node-3 의 1분동안 처리량 : 20021

20021개로 출력되는것을 볼 수 있다

 

2.2.3. Producer Agent의 Hang 체크

 

[Producer Agent는 왜 행이걸릴까?]

보통 데이터 적재가 안되면 "consumer group에서 행이걸린거겠지..?" 생각하고 consumer app과 KAFKA데몬만 재기동하는 경우가 많습니다. (사실 이 방법이 가장 직설적이며 간단한 방법이지요.)

클러스터는 상황에따라 데이터량이 갑작스럽게 증가 하는 경우가 많습니다.
예측하지 못한 상황에 의해 producer가 갑자기 많은양의 데이터를 감당하지 못하고 Hang이 걸리는 경우가 발생 할 수 있습니다. 

그렇기 때문에 kafka에 데이터를 보내 주는 producer 역시도 모니터링이 필요합니다.
데이터 전송상태를 출력하는 클래스를 Producer Agent에 구현 하는 것 도 하나의 모니터링 방법이 될 수 있습니다.

아래에서는 조금 색다른 방법의 모니터링을 보여드리겠습니다.

 

[실전 적용]

자, 여기 gc.log를 남기는 producer역할의 Agent 가 있습니다.

이 Agent는 Java기반의 프로그램이며, GC또한 존재합니다.

 

jvm 설정을 통해 출력하는 gc.log의 형태는 아래와 같습니다.

gc.log file 명 : gc.agent.log

2021-12-15T04:57:28.602+0900: 307125.450: [GC [PSYoungGen: 292374K->18933K(316928K)] 9972894K->9702027K(10058240K), 0.0203490 secs] [Times: user=0.29 sys=0.00, real=0.02 secs]
2021-12-15T04:57:34.741+0900: 307131.590: [GC [PSYoungGen: 293877K->23686K(316416K)] 9976971K->9718670K(10057728K), 0.0261380 secs] [Times: user=0.29 sys=0.02, real=0.02 secs]
2021-12-15T04:57:41.501+0900: 307138.350: [GC [PSYoungGen: 298630K->27418K(298496K)] 9993614K->9725402K(10039808K), 0.0345700 secs] [Times: user=0.27 sys=0.01, real=0.03 secs]
2021-12-15T04:57:47.930+0900: 307144.779: [GC [PSYoungGen: 298266K->27540K(294400K)] 9996250K->9728638K(10035712K), 0.0259060 secs] [Times: user=0.27 sys=0.00, real=0.03 secs]
2021-12-15T04:57:52.472+0900: 307149.320: [GC [PSYoungGen: 294292K->24043K(286720K)] 9995390K->9729680K(10028032K), 0.0279990 secs] [Times: user=0.36 sys=0.01, real=0.03 secs]
2021-12-15T04:57:58.552+0900: 307155.401: [GC [PSYoungGen: 286694K->25312K(284160K)] 9992330K->9739092K(10025472K), 0.0286700 secs] [Times: user=0.33 sys=0.01, real=0.03 secs]
2021-12-15T04:58:04.579+0900: 307161.428: [GC [PSYoungGen: 283861K->24093K(287232K)] 9997640K->9741483K(10028544K), 0.0290270 secs] [Times: user=0.34 sys=0.00, real=0.03 secs]
2021-12-15T04:58:10.363+0900: 307167.212: [GC [PSYoungGen: 279069K->23939K(275456K)] 9996459K->9744718K(10016768K), 0.0356700 secs] [Times: user=0.40 sys=0.00, real=0.04 secs]
2021-12-15T04:58:15.110+0900: 307171.958: [GC [PSYoungGen: 275331K->20312K(280064K)] 9996110K->9744154K(10021376K), 0.0312460 secs] [Times: user=0.37 sys=0.00, real=0.03 secs]
2021-12-15T04:58:19.922+0900: 307176.770: [GC [PSYoungGen: 268120K->22876K(267264K)] 9991962K->9750300K(10008576K), 0.0282790 secs] [Times: user=0.33 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.108+0900: 307181.956: [GC [PSYoungGen: 267100K->22232K(270848K)] 9994524K->9754632K(10012160K), 0.0293170 secs] [Times: user=0.36 sys=0.00, real=0.03 secs]
2021-12-15T04:58:25.137+0900: 307181.986: [Full GC [PSYoungGen: 22232K->0K(270848K)] [ParOldGen: 9732399K->9247170K(9814016K)] 9754632K->9247170K(10084864K) [PSPermGen: 49072K->49072K(262144K)], 3.3763050 secs] [Times: user=58.53 sys=0.00, real=3.37 secs]

 

이 Agent를 통해 발생하는 gc.log 활용 하여  java old영역의 점유율을 출력하는 sh을 작성하도록 하겠습니다.

vi gc_check.sh
#!/bin/bash
GC_LOG_PATH=/home/user/app/producer-agent/log
 
#GC 파일들을 읽어온다
GC_FILE=`ls -alrt $GC_LOG_PATH | grep gc.agent.log |  awk '{print $8}'`
 
#GC파일의 line을 변수에 저장. for 문으로 활용하시면 좋습니다
line=`tail $GC_LOG_PATH/${GC_FILE[0]} -n1`
 
#NUM1과 NUM2를 정규식으로 파싱하여 old영역 계산
NUM1=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $1}')
NUM2=$(echo $line | grep Full | awk '{print $8}' | sed "s/(.*$//" | sed "s/K//g" | sed "s/->/ /g" | awk '{print $2}')
((NUM2=${NUM2}*100))
((NUM1=${NUM2}/${NUM1}))
 
echo gc log의 old 영역은 $NUM1% 입니다.


출력 결과)

$ ./gc_check.sh
gc log의 old 영역은 95% 입니다.

이렇게 gc.log에서 old영역을 추출함으로서 Producer Agent의  Hang을 체크해 볼 수 있습니다.

GC가 해소되지 않는다는것은 Agent에 악영향을 끼치고 있다는 뜻입니다.

위 sh파일을 활용하여 추이를 관찰하고, 일정주기별로 재기동 하여준다면 아주 좋은 모니터링의 방법이 될 수 있습니다.


 

3. 결론 

최적의 실시간 데이터 모니터링 기법을 적용하여 장애나 데이터 누락을 최소화 하는 것은 Data Engineer의 필수요건입니다.

 

데이터 모니터링에는 정답이 없습니다.

위에서 작성했던 내용처럼 ActiveMQ의 웹콘솔과 CLI를 활용하여도 되고, GC log를 캐치하여 agent 행을 체크해도 됩니다.

만약 위의 방법이 어렵다면 단순하게 metric log를 추출 한 뒤에 이를 시계열 데이터로 출력하여도 됩니다. 이 또한 좋은 모니터링 방법이 될것입니다.  

무엇보다도!!! 다양한 오픈소스를 research 하고, 실전에 적용해보는 연습이 제일 중요합니다.

 

또한, https://github.com/datastacktv/data-engineer-roadmap 깃허브 페이지에서는 최신 트렌드의 Data Engineer 로드맵을 매년 포스팅 해주고 있습니다.

무엇을 해야할 지 모르겠다면 최신동향의 적절한 소프트웨어를 하나 골라 공부 한 뒤에,

자신의 능력와 개발능력을 총 동원하여 클러스터에 맞는 모니터링 솔루션을 개발하시길 바랍니다.

 

 

reference

https://activemq.apache.org/

 

 

1. 개요

빅데이터 클러스터에서 실시간 데이터 모니터링은 어렵습니다.

제가 운영중인 빅데이터분석플랫폼들은 엄청나게 많은 데이터를 처리하고 있습니다.
크기는 무려 페타급 규모이며 어떤 테이블은 모든 파티션의 row가 수십조개나 됩니다. 
이런 데이터들을 실시간으로 모니터링한다면 ? 생각만해도 참 숨이 막혀옵니다.

그 규모가 절대 작지 않습니다.

실시간데이터라고 규모가 작지 않습니다. 
운영중인 클러스터는 수 십 PB급 규모이며, 수십 개 종류의 데이터를 TB단위로 실시간 적재/전송/처리/제공 하고있습니다.

File System대신 memory 친화적이다.

실시간데이터는 대부분 메모리를 통해 빠른전송방식을 택하고있다는것이 일반적이죠.
메모리를 활용하는 Software들은 File System에 제한적인 정보만 저장하기 때문에,  별도로 로그를 남기지 않는 이상 모든데이터를 검사하는것은 어렵습니다.

모든 데이터를 검사 하더라도, 실시간 데이터를 검사하기 위한 리소스도 많이필요할 것입니다.

 

그래도 좋은방법은 존재한다.

하지만 흘러가는 데이터의 스냅샷 등을 주기적으로 관찰하거나, 처리중인 데이터의 추이를 활용한다면?
클러스터에 부담가지 않으면서, 인력도 크게 필요하지 않은 효율적인 모니터링이 될것입니다.

이 문서를 통해서 빅데이터클러스터에서 실시간 데이터의 효율적인 모니터링 방법을 익혀보도록 합시다.

 

2. 모니터링 실전 적용

2.1. ActiveMQ 

Apache ActiveMQ란 무엇인가?

Apache ActiveMQ는 다중프로토콜을 지원하는 Java기반의 메시지 브로커입니다. 아파치 재단의 OSS로 누구나 무료로 사용 할 수 있습니다.

ActiveMQ는 아래와 같은 특성을 가집니다.

  • Java, C, Python, php 등 다양한 언어 간 클라이언트 및 프토토콜을 지원합니다
  • JMS 1.1 및 J2EE 1.4 를 완벽하게 지원하며, 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공 합니다.
  • JMS(Java Message Service) 클라이언트와 메시지 브로커 모두에서 통합패턴에 대해 완벽히 지원합니다.

ActiveMQ의 메시지는 Producer → Broker → Consumer 구조로 처리되며,
NexR에서 운영하는 클러스터는 Broker에 Queue 모델을 채택하여  클라이언트/서버 상황에 따라 메시지를 전송합니다.

ActiveMQ 웹 UI 콘솔을 활용한 모니터링

ActiveMQ 기본설치시 간단한 Operation을 위한 web ui도 설치됩니다.

Web UI에서는 ActiveMQ를 통해 전송되는 queue 상태를 확인 할 수 있을뿐만 아니라 세부 메세지 내역도 확인이 가능합니다

 

1. ActiveMQ 웹 콘솔 메뉴구성

ActiveMQ 웹 콘솔에 (디폴트 url : localhost:8161/admin/) 접속하면 위와 같은 메뉴구성을 만날 수 있습니다.

  • Home
    Broker의 이름, 버전, memory usage, uptime 등 메세지 시스템의 기본 구성을 보여줍니다
  • Queues
    ActiveMQ를 통해 전송되는 topic들의 queue 상태를 확인 할 수 있습니다.
  • Topics
    ActiveMQ를 통해 전송되는 topic들의 다양한 operation이 가능합니다.

2. Queues 탭


Queues 탭에서는 전송되는 topic들의 queue 현황을 확인할 수 있습니다.

보시면 # of pening Message에 숫자가 꽤 쌓여있는것을 볼 수 있습니다. 영문 그대로 "전송되지 못하고 보류중인 데이터"라는 뜻입니다.
이 숫자는 Enqueued 된 메시지중 Dequeued 되지 못한 메시지 수 와 동일합니다. 

# of pening Message가 지속적으로 쌓인다면 agent가 제대로 메시지를 처리하지 못하고 있는 뜻인데요, 
일부 유실되어도 괜찮은 데이터라면 모르지만 중요한 데이터라면 유실되지 않게 메모리 증설 등 튜닝을 진행하여 줍시다.

 


우측에 보면 3가지 수행이 가능한 Operation 공간이 있습니다.

Send to는 수동으로 message를 보낼 수 있는 기능입니다.
Purge를 활용한다면 pending된 모든 메세지를 없앨 수 있습니다. 이 때 purge를 통해 없어진 데이터들은 모두 유실이 됩니다.
delete는 해당 topic을 지울때 사용합니다. 왠만하면 누를일이 없기 바랍니다.

 

3. 메시지 확인

Queues탭에서 메시지 확인을 원하는 Topic을 클릭한다면, 위 와 같이 메시지들의 정보들을 확인 가능합니다.

확인을 원하는 메시지들을 클릭하면 아래와 같이 세부정보를 확인할 수 있습니다. 

메시지의 메타정보 뿐만 아니라 detail 정보까지 확인이 가능합니다.

이렇게 빠르고 간결하고 편한데 웹콘솔을 사용하지 않을 이유가 없죠?

 

 

 

+ Recent posts