소켓통신이란?

Network Layer 에서 Application Layer(응용계층)과 Transport Layer(전송계층) 사이에 존재하는 인터페이스입니다.

소켓을 사용하면 Application Layer에서 Network를 통해 다른 End Point와 통신을 할 수 있습니다.

소켓은 하위 Layer의 이해없이도 Application 계층에서 프로그래밍이 가능하다는 장점이 있습니다. 

 

소켓통신의 essential type

SOCK_STREAM -  채택한 방식

  • a.k.a TCP
  • 안정적인 전송
  • 연결 순서를 보장 해줌
  • 연결 지향적 (server ↔ client 1:1 연결)
  • 채팅, 이메일, HTTP 통신 등 에 사용

SOCK_DGRAN

  • a.k.a UDP
  • 전송이 안정적이지 않음
  • 연결 순서를 보장해주지 않음
  • 패킷이 연결상대(목적지 주소)에 대한 정보 없이 전달 
  • send와 동시에 receive가 가능
  • 동영상스트리밍 서비스, 실시간 온라인게임에 사용

 

소켓 통신 FLOW

TCP 소켓통신은 Server와 client의 end to end 통신이며, Application의 라이브러리에 있는 간단한 메소드로 소켓통신이 가능하다 .

  • socket()
    소켓통신을 위한 소켓 생성.
    소켓생성 직후에는 다른 End point와 바로 통신 할 수 없으며, 리턴타입은 File 타입.

  • connet()
    client에서 server와 연결하기 위해 사용하는 함수.

  • bind()
    다른 소켓을 받아들일 ip와 port, 그리고 서버쪽 socket을 지정해준다.
    통신을 위한 socket(file)에 ip, port를 지정하여 하나의 프로세스로 묶어준다.

  • listen()
    연결 대기중인 client를 관리하는 Queue.
    TCP Socket 통신은 1:1연결이기에 listen() 을 통해서 대기열을 관리한다.

  • accept()
    listen에서 대기중인 클라이언트와 accept를 통해 연결 수용.

  • read/recv(), write/send()
    연결된 상대와 주고받고자 하는 buffer데이터를 송수신 하는 함수.
    server가 client로 부터 close() 수신시 EOF. 

개발 요구사항

  • 전용 인터넷 회선과 보안 VPN을 통해 데이터를 전송하고, 통신방식은 TCP/IP Socket을 사용.

  • 모든 메세지 처리에 대한 응답은 상호 소켓 연결 후 즉시 응답.
    • 응답시간이 N초를 초과할 경우 M회 재시도
    • 그래도 응답이 없을 경우 실패로 간주하여 연결 종료

  •  다수의 발송이 필요할 경우 멀티쓰레드로 구현하며, 멀티쓰레드 최대 N개를 넘지 않는다.

  • 상호 데이터 송수신은 정해진 규격에 따라 진행한다.

동작 설계

  • 동작하는 데몬은 송신용 서버와 수신용 클라이언트로 구성한다.

  • ConfigParser를 사용하여 설정파일(conf.properties)을 읽어온다.

  • logging / handler 모듈을 통해 로그 관리를 한다.

  •  Class를 활용하여 송수신 할 Data Object 정의한다.
    이렇게 하면 추 후 데이터 송수신 규격이 바뀌어도 관리가 용이하다.

  • 소켓 생성 시 멀티스레드를 통해 병렬처리가 가능하게끔 한다.

  • env와 startup을 구성하여 운영할 때 용이하게 한다.

 

스켈레톤

수신용 서버

수신용 데몬 - recv_server.py
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에서 보내주는 데이터 소켓통신 수신용 서버
        * 동작 과정
            1. Remote로부터 패킷수신 하기 위해 socket bind (recv_server.py)
            2. Remote로부터 패킷수신
            ...
            ...
            3.2 패킷수신 실패시, Remote server에서 별도처리
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가   
        ...                          
'''
 
import socket
import threading
import time
import os
import datetime
import ConfigParser
 
import logging
from logging import handlers
 
def read_config() :
    config = {}
    config = ConfigParser.ConfigParser()
    config.read('/file/to/path/conf.properties')
 
    '''
    conf값으로 불러올 변수 선언
    '''
 
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
     
    '''
    Log 포맷과 log handler 작성 후 logging 객체에 부착
    '''
 
class clustertoremote_obj :
     
    #data_to_Remote : list
    def __init__(self, data_to_Remote) :
        self.a = a
        self.b = B
        '''
        Data Object 구성
        '''
 
class remotetokcluster_obj :
 
    #data_from_Remote : list (len : 9)
    def __init__(self, data_from_Remote) :
        self.a = data_from_Remote[0]
        self.b = data_from_Remote[1]
        '''
        Data Object 구성
        '''
 
def do_to_str(Array):
    s = '|'.join(Array)  
    return s
 
def file_write(remoteDo):
 
    file_path = REMOTE_DATA_DIR
    file_name = datetime.datetime.today().strftime('%Y%m%d%H%M') + '_request.txt'
    save_as = file_path+file_name
 
    f=open(save_as,'a')
    f.write(remotedo_to_str(remoteDo)+'\n')
    f.close()
    '''
    Return : True/False
    '''
 
def check_parameter(data):
    myLogger.info(' run parameter check module')
 
    #local variable init
    length='0000'
    val2='REQ'
    val3='000000000'
 
 
    # data의 정합성 체크
        '''
        수신받은 데이터 체크, ERROR 확인 및 분기처리
        Return : 수신받은 데이터의 DO
        '''
    #setting clusterDo
    ...
    clusterDo.valN = 'XXX'
 
    return do_to_str(clusterDo)
 
def handler(client_socket, addr, data) :
    myLogger.info(' run client handler')
 
    '''
    생성된 소켓을 실제로 처리하는 컨트롤러/핸들러 작성
    '''
    client_socket.sendall(return_msg)
 
def server_req(client_socket, addr) :
    myLogger.info('=====================================')
    myLogger.info(' The thread created successfully. The socket connected from %s', addr)
    '''
    소켓 생성후 연결되었을때, 처리하는 함수 작성
    '''
    # 작업 후 소켓 close
    client_socket.close()
    myLogger.info(' socket closed from client ')
    myLogger.info('=====================================')
 
def prepare_socket_recv() :
    global server_socket
 
    #create socket object
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #error handle
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
    #bind server
    server_socket.bind((HOST,PORT))
 
    '''
    소켓 생성 및 스레드 처리.
    큐 관리.
    '''
 
if __name__ == "__main__" :
    myLogger.info(' RECV SERVER DAEMON START')
    prepare_socket_recv()
    server_socket.close()

 

송신용 클라이언트

송신용 데몬 - send_client.py
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에게 소켓통신 송신용 클라이언트
        * 동작 과정
            1. Remote로 부터 패킷전송 socket 생성 (send_client.py)
            2. Remote와 부터 패킷연결
            ...
            ...
            3.2 패킷연결 실패시, 해당 파일 재시도
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가
        * 2021MMDD [v.1.x.x]: send_file() / file 체크 로직 변경
        ...
'''
import socket
import threading
import time
import os
import datetime
import io
 
import shutil
import ConfigParser
 
import logging
from logging import handlers
 
def read_config() :
    config = {}
    config = ConfigParser.ConfigParser()
    config.read('/file/to/path/conf.properties')
     
    '''
    conf값으로 불러올 변수 선언
    '''
 
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
     
    '''
    Log 포맷과 log handler 작성 후 logging 객체에 부착
    '''
 
 
def get_text(file_full_path):
    send_data = f.readline().replace('\n','')
    return combine_text(send_data)
 
def client_send(file_full_path):
 
    myLogger.info(' read file from.. %s ', file_full_path)
    send_data = get_text(file_full_path)
 
    myLogger.info(' Ready to send data %s', send_data)
    #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    try :
        '''
        소켓 송신을 위한 내용 작성
        '''
 
    except Exception as e :
            '''
            예외처리 항목 설정
            '''
 
def prepare_socket_send(file_full_path) :
    myLogger.info(' the socket created')
    global to_remote_client_socket
    to_remote_client_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    while not conn:
        try :
             '''
            소켓 송신을 위한 내용 작성
            '''
        except Exception as e :
            myLogger.error(' connetion error %s ', e)
            '''
            예외처리 항목 설정
            '''
 
    to_remote_client_socket.close()
 
def send_file():
    isNewFile=False
    myLogger.info(' SEND CLIENT DAEMON START')
    while True :
        file_list= os.listdir(remote_RESULT_DIR)
        if len(file_list) > 0 :
            isNewFile = True
        else :
            isNewFile = False
 
 
        if isNewFile :
            '''
            무한루프 돌면서, 송수신 환경이 충족될 경우 소켓송수신 수행
            '''
 
        isNewFile=False
        #file 없을경우 쉼
        time.sleep(FILE_FOUND_SLEEP_TIME)
 
 
def run():
    send_file()
 
if __name__ == "__main__" :
    run()

 

메소드 설명

  • AUTHOR
#!/usr/bin/python
#-*- coding: utf-8 -*-
'''
    Author : Henry Kim
    Date Created : 2020.xx.xx
    Date Last Modified : 2021.xx.xx
     
    File Version : 1.x.x
    Description :
        * Remote 에서 보내주는 데이터 소켓통신 수신용 서버
        * 동작 과정
            1. Remote로부터 패킷수신 하기 위해 socket bind (recv_server.py)
            2. Remote로부터 패킷수신
            ...
            ...
            3.2 패킷수신 실패시, Remote server에서 별도처리
     
        * 무한루프로 돌며 데몬형식으로 작동.
     
    수정내역 :
        * YYYYMMDD : method_name / @@기능 추가
        * 2020MMDD [v.1.x.x]: configparser 모듈 추가
        * 2020MMDD [v.1.x.x]: logging 모듈 추가   
        ...                          
'''

형상관리 툴을 사용할 수 없는 환경에서 소스코드에대한 정보를 기입하는, 일종의 Abstract 같은 존재입니다.

AUTHOR, Date Created, Last Modified 등 기본정보를 적어주고

형상 변환에 따라 버전정보와 수정내역, 기본적인 소스구동원리(Description)을 작성하여줍니다. 

 

  • def read_logging()
def read_logging() :
#logging 설정
    LOG_FILENAME = 'mylog.log'
 
    myLogFormatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s')
    myLogHandler = handlers.TimedRotatingFileHandler(filename=LOG_DIR+LOG_FILENAME, when='midnight', interval=1, encoding='utf-8')
    myLogHandler.setFormatter(myLogFormatter)
    myLogHandler.suffix = "%Y%m%d"
 
    myLogger = logging.getLogger('server')
    myLogger.setLevel(logging.INFO) # Logging level
    myLogger.addHandler(myLogHandler) # Log 모듈에 myLogHandler 부착

myLogFormatter 는 logging 모듈에서 사용할 포맷을 지정한다. 시간, 수행한 py 스크립트명, 발생위치, 메세지등 형식 지정 가능. 

myLogHandler는 Log 저장경로와, 인코딩 설정, 자정기준으로 새로운파일 생성여부를 지정하여준다.

myLogHandler의 포맷에 myLogFormatter  장착하여 주고 myLogger 에 기본 logging 레벨을 지정한뒤 myLogHandler를 부착하여 준다.

 

  • def do_to_str(Array)
def do_to_str(Array):
    s = '|'.join(Array)  
    return s

입력받은 Array를 구분자 | 로 구분된 텍스트로 변환해주는 함수.

 

  • def file_write(remoteDo)
def file_write(remoteDo):
 
    file_path = REMOTE_DATA_DIR
    file_name = datetime.datetime.today().strftime('%Y%m%d%H%M') + '_request.txt'
    save_as = file_path+file_name
 
    f=open(save_as,'a')
    f.write(remotedo_to_str(remoteDo)+'\n')
    f.close()
 
    if os.path.isfile(save_as):
        myLogger.info(' the file saved success as %s', save_as)
        return True
    else :
        myLogger.error(' can not create %s', save_as)
        return False

Remote서버로부터 소켓으로 전달받은 데이터는 후행 W/F를 위해 txt 파일 형태로 저장해야합니다.

파이썬에 내장된 open함수를 이용해 로컬 FS에 저장 한 후, isfile()을 사용하여 파일 저장여부를 반환합니다.  

 

  • def check_parameter(data)
def check_parameter(data):
    myLogger.info(' run parameter check module')
 
    #local variable init
    length='0000'
    val2='REQ'
    val3='000000000'
 
    finalVal='\n'
 
 
    clusterDo = clustertoRemote_req_obj([length,val2,val3])
 
    numofpipe=data.count('|')
 
    # data의 정합성 체크
    if numofpipe == NUM_OF_COLUMN-1 :
        messageValue ='200'
        data_list = data.split('|')
        remoteDo = remotetocluster_obj(data_list)
        remoteProcessNo = remoteDo.remoteProcessNo
 
        #check request type 400
        if remoteDo.requestType != 'REQ' :
            myLogger.warn(' requestType :  %s' , remoteDo.requestType)
            messageValue = '400'
 
        '''
        수신받은 데이터 체크, ERROR 확인 및 분기처리
        '''
 
 
    #setting clusterDo
    clusterDo.length = '%04d'%len('|'.join([length,val1,val2,...valN])+finalVal)
    clusterDo.requestType = 'XXX'
    ...
    clusterDo.valN = 'XXX'
 
    return do_to_str(clusterDo)

check_parameter는 Remote ↔ 클러스터간 규격서에 어긋나는 데이터형식이 들어올경우 분기처리를 해주는 함수입니다.

messageValue 변수는 클러스터의 정상수신여부/오류정보등을 담고있는 응답코드값입니다.

리턴타입은 클러스터에서 Remote로 보내기 위한 Data Object 입니다.

 

  • def handler(client_socket, addr, data)
def handler(client_socket, addr, data) :
    myLogger.info(' run client handler')
 
    return_msg = check_parameter(data)
     #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    return_msg = return_msg.encode('utf-8')
 
    myLogger.info(' cluster to remote send msg :  %s' , return_msg)
    client_socket.sendall(return_msg)

socket data를 실제 send하는 함수입니다.

아래에서 설명 할 server_req의 종속함수 이며, recv_server.py 함수 호출의 End Point입니다.

 

  • def server_req(client_socket, addr)
def server_req(client_socket, addr) :
    myLogger.info('=====================================')
    myLogger.info(' The thread created successfully. The socket connected from %s', addr)
    data = client_socket.recv(1024)
    data = data.decode('utf-8')
    myLogger.info(' data recieve success. DATA : %s', data.encode('utf-8'))
 
    data = data.replace('\n','') #\n 문자 제거(cluster Daemon에서 필요 없는 parameter)
    myLogger.info(' remove \n for inner process logic. DATA : %s', data.encode('utf-8'))
 
    if len(data) == 0 :
        myLogger.warn(' Data from client is Empty')
    #socket handler
    handler(client_socket, addr, data)
 
    client_socket.close()
    myLogger.info(' socket closed from client ')
    myLogger.info('=====================================')

Remote서버로 전송할 socket의 환경설정을 담당하며, 소켓방식으로 데이터를 send하는 handler()함수를 실행합니다

아래에 설명할 prepare_socket_recv() 함수의 종속 함수이며, server_req()는 스레드로 생성되어 병렬처리됩니다

 

  • def prepare_socket_recv()
def prepare_socket_recv() :
    global server_socket
 
    #create socket object
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #error handle
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
    #bind server
    server_socket.bind((HOST,PORT))
 
    myLogger.info('socket bind done %s' ,server_socket)
    server_socket.listen(MAX_CLIENT_NUM)
    myLogger.info('socket wait listen %s' ,server_socket)
    while True :
        try :
            client_socket, addr = server_socket.accept()
            th_server_req = threading.Thread(target=server_req, args =(client_socket, addr))
            th_server_req.daemon=True
            myLogger.info('The thread allocated for server request from remote client. thread-id : %s', th_server_req)
            th_server_req.start()
 
        except KeyboardInterrupt as k :
            myLogger.error(' binded server socket closed by Interrupt %s', k)
            server_socket.close()
            break
 
        except Exception as e :
            myLogger.error(' connetion %s', e)
            server_socket.close()
            break

socket.socket()으로 소켓을 생성하고 .bind()로 실제 송신할 준비를 합니다.

통신 규격 조건에 맞게 listen()을 사용하여 소켓이 대기할 Queue사이즈를 지정하여 줍니다

 threading.Thread를 통해 멀티스레드를 실행하며, 소켓 통신을 위한 server_req 를 스레드로 실행합니다.

이때 try/exception 예외처리를 통해 소켓통신의 온전한 수행을 보장합니다.

 

 

 

 

  • def client_send(file_full_path)
def client_send(file_full_path):
 
    send_data = get_text(file_full_path)
 
    #socket 데이터는 utf-8 (바이트)형태로 송수신. encode 필요
    try :
        to_remote_client_socket.sendall(send_data.encode('utf-8'))
        #응답데이터 수신
        recv_data = to_remote_client_socket.recv(1024) #byte(utf-8)
 
        #성공시
        if recv_data.split('|')[-1] == '200' :
            remove_file(file_full_path)
 
        #수신안되면 종료하기
        else :
            failed_file(file_full_path)
 
    except Exception as e :
            myLogger.error(' to remote socket send error %s ', e)

client_send()은 송신용 데몬에서 소켓을 발송하는 함수입니다.

인코딩, 데이터의 크기 등 소켓의 기본환경을 설정하여주고, 로컬에서 생성된 data 가 발송준비가 되었는지 최종 체크를 진행합니다.

보낼 data에 이상이 없으면 소켓을  통해 보내주며, 발송 성공/실패에 따라 로컬파일 분기처리를 합니다.

아래 설명할 prepare_socket_send() 함수의 종속함수 입니다.

 

  • def prepare_socket_send(file_full_path) 
def prepare_socket_send(file_full_path) :
    global to_remote_client_socket
    to_remote_client_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    conn=False
    conn_cnt=0
    while not conn:
        try :
            to_remote_client_socket.connect((remote_HOST,PORT))
 
            th_client_send = threading.Thread(target=client_send, args=(file_full_path, ))
            th_client_send.daemon=True
            myLogger.info(' The thread allocated for client send to remote server. thread-id : %s',th_client_send)
            th_client_send.start()
            conn = True
        except Exception as e :
            '''
                예외처리 항목 설정
            '''
 
    to_remote_client_socket.close()

prepare_socket_send() 함수는  client_send()의 스레드를 생성하여 송신용 소켓의 관리를 하는 함수입니다.

상대측과  데이터를 받을 준비가 되었는지 체크를 하며, 정상 연결 되었을경우 하위 스레드를 생성합니다

실제 소켓 송신은  client_send()에서 수행합니다. 이 때 발송할 데이터의 경로를 받아와(file_full_path) 파라미터로 넘겨줍니다.

 

  • def send_file()
def send_file():
    while True :
        file_list= os.listdir(remote_RESULT_DIR)
        if len(file_list) > 0 :
            myLogger.info('=====================================')
            myLogger.info(' File Founded! : %s ',file_list)
            isNewFile = True
        else :
            isNewFile = False
 
 
        if isNewFile :
            #가장 오래된 파일 send
            file_name = os.listdir(remote_RESULT_DIR)[-1]
            file_full_path = remote_RESULT_DIR+file_name
            prepare_socket_send(file_full_path)
 
            file_list= os.listdir(remote_RESULT_DIR)
            if len(file_list) > 0 :
                continue
 
        isNewFile=False

송신용 데몬은 무한루프를 돌면서 remote 서버와 끊임없이 연결을 시도합니다.

단, 네트워크트래픽 부하를 방지하기위해 규격서에 정의된 sleep 타입이 존재합니다.

기본적으로 로컬FS에 저장된 텍스트파일(전송을 위해 분석이 끝난 파일)이 존재하는지도 같이 체크합니다.

상대측 서버와 connection을 맺고 로컬파일이 준비 되었다면 prepare_socket_send() 소켓 송신모듈을 호출합니다

 

참고자료

 

https://docs.python.org/3/howto/sockets.html

http://www.kocw.net/home/search/kemView.do?kemId=1169634

https://codezone4.wordpress.com/2013/02/24/php-socket-programming-basics/

+ Recent posts