프로그래밍/Let's Share it

비동기(OVERLAPPED I/O, IOCP, Thread Pool) IPC PIPE Class 공유

초록생선 2011. 12. 14. 16:24

일반적으로 .exe간 통신을 위해서 PIPE를 흔히 사용하곤 합니다.
보통 그런 경우, 수신 Part(즉, Server side)에서는 Thread를 만들기도 합니다.
그래서, 보통의 PIPE skeleton은 thread 생성 코드가 포함됩니다.
혹은, 수신 Part에서는 지속적인 Loop를 돌기때문에, 어쩔수 없이 Caller는 Thread를 만들어 호출해야 합니다.
즉,


와 같이 호출자체가 block되는 구조이기 때문에, caller는 다른 작업을 진행하기 위해
thread를 만들어야 합니다.

이것을 방지하기 위해, 본 IPC PIPE는 IOCP와 Thread pool을 사용하였으며,
물론, 내부적으로도 CreateThread나 _beginthreadex를 전혀 사용하지 않았습니다.

즉, 다음과 같은 구조가 됩니다.

즉, 모든것은 Primary thread에서 진행되며, block과정은 없습니다.
그리고, 접속이 들어왔을 때, 그때 OS에서 생성해준 Thread에서 우리는
PIPE 작업을 하게 됩니다. 그러니, CreateThread나 _beginthreadex를 사용하지
않게 되죠.

이러한 구현을 위해 필요한 것이 Overlapped i/o와 ::RegisterWaitForSingleObject 입니다.
즉, 위 block되는 함수들 특징이, ::ReadFile / ::WriteFile를 사용할때, Overlapped I/O를
사용하지 않기 때문입니다.
그리고, 접속 체크를 위해 WaitForSingle(*Multiple)Object를 사용하면 block됩니다.
만일, ::RegisterWaitForSingleObject를 사용한다면, I/O 완료(Completion)때 호출될 callback만
등록하고 그 함수는 pass되는데, 그렇게 되면 완전한 non-block Server가 구현됩니다.

ConnectNamedPipe는 PIPE 접속을 기다리는 함수로, Overlapped I/O를 사용하지 않으면,
Block됩니다. 그래서 lpOverlapped를 만들어 전달하면 Block되지 않습니다.
그러면, 해당 I/O가 완료(Completion)된 싯점을 어떻게 알게 되냐?가 문제됩니다.
일반적으로, Overlapped I/O의 event를 ::WaitForSingleObject(...)로 진행하게되는데,
그러면 또 자체적인 block함수를 만들게 됩니다.
따라서, 해당 싯점을 ::RegisterWaitForSingleObject(...)를 사용하여 비동기적으로 알 수 있다면,
문제를 완벽하게 해결할 수 있습니다.

즉,
// 만드시 ManualReset을 FALSE로 해야 한다.
overlap.hEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
::RegisterWaitForSingleObject(&m_hWait, overlap.h, OnConnect, ....);
::ConnectNamedPipe(hPipe, &overlap);

// block되지 않고 넘어감.
...
...
...
// ConnectNamedPipe I/O 완료(Completion)시 호출받음.
// 물론, OS에서 관리하는 Thread Pool내에서 호출 받음 (자세한건 RegisterWaitForSingleObject flag 참고)
VOID CALLBACK OnConnect(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
...
}
와 같은 구조가 됩니다.

물론, ConectNamedPipe(...)이후의 코드가 다른 Thread에서 생성되기 때문에,
디버깅이 힘든 단점이 있습니다. 허나, 프로그램의 효율을 높이기 위해서는 어쩔수 없는 선택이죠.
왜 이것이 효율이 높냐면, OS의 Thread Pool을 사용하기 때문입니다.
즉, 만들어진 Thread를 계속 재활용하기 때문에, Thread 생성의 부하가 줄어듭니다.
그리고, Event등 I/O 수신을 위해 불필요하게 생성되는 Thread의 개수를 줄일 수 있기 떄문입니다.
즉, 만약, Connection이 자주 발생하지 않는 프로그램인 경우, 시작부터 Thread를 생성하는것이
낭비이기 때문입니다. (* 참고로, ::RegisterWaitForSingleObject를 호출하면 최소 2~3개의 Thread가
생성됩니다. 만일, Connection의 개수가 1~2개를 넘기지 않는다면, 전통적 방법을 생각해 보세요)

이제, CPipeServer / CPipeClient / CPipeData를 소개해야 겠습니다.
물론, CCL 라이센스이기 때문에, 상업적 이용은 불가하며, 컨텐츠 변경은 가능합니다.
상업적 이용을 위해서는 저에게 간단히 contact해 주시기 바랍니다.

자세한 코드는 Pipelpc.zip을 참고하시기 바랍니다.

DWORD CPipeServer::StartService
[IN] LPCTSTR lpszChannelId
[IN] PFN_OnRequest pfnOnRequest
[OPTIONAL IN] LPVOID pContext

Service를 시작합니다.
물론, 바로 리턴이 이뤄지며, 해당 과정에서 코드상 Thread 생성 부분은 없습니다.
(::RegisterWaitForSingleObject(...)호출로 인해 기본적인 2~3개 생성됨. 이는 OS에서 관리됨)

ChannelID는 통신을 위한 channel명이 됩니다.
이미 Channel이 열려있다면 실패하게 됩니다. (PIPE instance를 1개로 설정함)
물론, 해당 코드를 좀 수정하면, 여러개의 instance 생성이 가능합니다.

Win32 ErrorCode가 리턴됩니다.

pfnOnRequest는
VOID OnRequest(IN CPipeData& cInPipeData, OUT CPipeData& cOutPipeData, IN LPVOID pContext)
형태로, Caller에서 전달한 데이터(cInPipeData)와 Caller에 전달할 데이터(cOutPipeData), 그리고,
StartService의 pContext를 전달
받는 함수 포인터입니다.
즉, Caller와 접속이 이뤄지면, 해당 callback이 호출된다고 보면 됩니다.
물론, 다른 임의의 Thread에서 호출받음을 명심하세요.

pContext는 callback 함수에 전달될 data 입니다.

본 함수가 non-block인 관계로, 언제까지 Listen하냐면,
StopService 호출 혹은 파괴자 호출입니다.
즉, Class Instance가 유지되는 동안은 Listen이 이뤄집니다.

VOID CPipeServer::StopService

시작중인 서비스를 중단시킵니다.
현재 진행중인 callback이 있다면, 종료때까지 block됩니다.

BOOL CPipeClient::AddParam
[IN] LPCTSTR lpszParamName
[IN] LPBYTE pBuf
[IN] size_t dwCbSize

Server에 전달할 Parameter들을 추가합니다.
물론, CallService하기전에 이뤄져야 합니다.
성공시 TRUE, 실패시 FALSE가 리턴됩니다.

lpszParameterName은 변수명을 의미합니다.
*|=등의 문자는 불허하며, 중복으로 들어가는 경우도 불허합니다.

pBuf는 전달될 BYTE buffer입니다.
dwCbSize는 전달할 buffer의 BYTE 크기 입니다.

DWORD CPipeClient::CallService
[IN] LPCTSTR lpszChannelId
[OUT] CPipeData& cOutPipeData
[IN OPTIONAL] DWORD dwConnectTimeoutSec

Service를 호출합니다.
서비스 응답까지 Block됩니다.
해당 서비스가 이전의 요청사항을 처리중 혹은 아직 실행이 안되었을 때,
Connect Timeout 만큼 기다려 줍니다. 해당 값의 단위는 초(sec)입니다.
그리고 Win32 ErrorCode를 리턴합니다.

lpszChannelId는 통신할 channel 명을 의미합니다.
cOutPipeData는 서버로부터 응답받은 데이터입니다.
dwConnectTimeoutSec는 접속을 위한 Timeout 값(초)입니다. 이는 접속을 위한 시간값이지,
CallService 실행시간을 의미하진 않습니다.

BOOL CPipeData::SetParam
[IN] LPCTSTR lpszName
[IN] LPBYTE pData
[IN] size_t dwCbSize

Parameter를 설정합니다.
이전값이 있는 경우, 실패합니다.
성공시 TRUE, 실패시 FALSE를 리턴합니다.

lpszName은 Parameter의 이름을 설정합니다. *|=등의 문자는 불허합니다.
pData, dwCbSize는 들어갈 data와 크기입니다.

해당 호출은, 따로 메모리를 할당하여 보관하므로,
Caller는 해당 함수 호출후, 전달 데이터를 파괴해도 무방합니다.

BOOL CPipeData::GetParamPtr
[IN] LPCTSTR lpszName
[OPTIONAL OUT] LPBYTE* ppData
[OPTIONAL OUT] size_t* pdwSize

Parameter를 구합니다.
값이 존재할시 TRUE를, 그 이외의 경우엔 FALSE를 리턴합니다.

lpszName은 구할 Parameter 이름입니다.
ppData는 전달받을 Data Pointer입니다. NULL 전달 가능합니다. 해당 Pointer는 내부에서 사용하는 메모리이므로,
read-only로 사용하시기 바랍니다.
pdwSize는 Data의 Byte크기를 전달합니다. NULL 전달 가능합니다.

VOID CPipeData::ClearAllParam

INT CPipeData::GetCount

BOOL CPipeData::GetName
[IN] INT nIndex
[OUT] LPTSTR lpszName
[IN] size_t dwCchLength

ClearAllParam은 모든 data를 삭제합니다.

GetCount는 Parameter의 개수를 구합니다.

GetName은 각 Index의 Parameter 이름을 구합니다.

VOID CPipeData::ToByteAlloc
VOID CPipeData::FromByte
VOID CPipeData::ToBase64Alloc
VOID CPipeData::FromBase64
VOID CPipeData::FreeAlloc

위 함수는 직접적으로 사용하지 않을 수 있습니다.
Parameter들을 Byte Stream으로 변환하는데 쓰이는 함수입니다.
즉, Pipe 통신시 실제로 위 함수를 이용하여 만들어진 Byte Stream으로 송/수신하게 됩니다.

물론, 보안을 위해 Base64를 간단히 이용하였습니다.
Pipe는 특징상 손쉽게 남이 훔쳐볼 수 있습니다.
그래서 보안상, 최소한의 손쉬운 인코딩(물론, 암호화는 아닙니다)을 가져온게 Base64입니다.
Base64는 ATL을 사용한다면 <atlenc.h>의 함수를 그대로 쓰면 되나,
혹시나 ATL을 사용하지 않는 project를 위해 ATL함수를 그대로 가져왔습니다.
(혹시 Microsoft ATL에 licensing이 있는지 모르겠습니다. 문제가 될때 삭제하겠습니다.)

물론, 알려진 Base64 또한 의미가 없습니다. 쉽게 잘 풀리니깐요.
그래서, Base64이되, 변환 Table을 변경하면, 쉽게는 뚤리지는 않습니다.

아래와 같이 수정 시나리오를 따르세요.
그러면 분석하기 힘든 바이너리 데이터로 송/수신하게 됩니다.
(물론 인코딩된거지 암호화된게 아니기 때문에, 완벽한 보안은 아닙니다.)

BOOL CPipeData::Base64Encode(
...
 static const char s_chBase64EncodingTable[64] = {
  1, 2, 3, ... };
--> (1)

// *szDest++ = '.';
*szDest++ = 111; --> (2)

int CPipeData::DecodeBase64Char(unsigned int ch) --> (3)
{
 int i = 0;
 static const char s_chBase64DecodingTable[64] = {
  1, 2, 3, ... };

 for (i=0; i<64; i++)
 {
  if (s_chBase64DecodingTable[i] == ch)
  {
   return i;
  }
 }
 return -1;
}


(1)에서 변환 table을 만듭니다. 0, 10, 13 값은 제외하도록 합니다. 물론 255보다 작아야 합니다.
되도록 값을 뒤 섞으십시요.
(2)에서 '.' 부분을 다른 값으로 변경합니다. 해당 값은 (1)의 table에 없어야 합니다.
(3)에서는 기존 코드를 변경합니다. (1) 변환 table을 가져오도록 합니다.

사용예는, 첨부된 Pipelpc.zip을 참고하시기 바랍니다.
Server/Client로 나눠진 프로젝트가 있으며, Common 경로에 Pipe 코드가 있습니다.
각 프로젝트 Debug에 .exe가 있습니다.(Static Link라 재배포팩 관련없습니다.)
Debug인 경우 printf되는 특징이 있어, Debug 빌드를 포함시켰습니다.

[INFO] Success to start service, \\.\pipe\mychannel
[INFO] Start request, \\.\pipe\mychannel
vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
[INFO] ****************
[INFO] Start DUMP Param
[INFO] ****************
[INFO][0] Name=foobar, Size=1, Data(UTF8)=
[INFO][1] Name=input_param, Size=58, Data(UTF8)=T
[INFO] *****************
[INFO] Finish DUMP Param
[INFO] *****************
[INPUT] TO_SERVER_SEND:call_pid=1764
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[INFO] Finished request,
\\.\pipe\mychannel
[INFO] Start request, \\.\pipe\mychannel
vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
[INFO] ****************
[INFO] Start DUMP Param
[INFO] ****************
[INFO][0] Name=foobar, Size=1, Data(UTF8)=
[INFO][1] Name=input_param, Size=58, Data(UTF8)=T
[INFO] *****************
[INFO] Finish DUMP Param
[INFO] *****************
[INPUT] TO_SERVER_SEND:call_pid=1764
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

[INFO] ****************
[INFO] Start DUMP Param
[INFO] ****************
[INFO][0] Name=hello, Size=40, Data(UTF8)=m
[INFO] *****************
[INFO] Finish DUMP Param
[INFO] *****************
[RESPONSE] my resoponse : 1808
[INFO] ****************
[INFO] Start DUMP Param
[INFO] ****************
[INFO][0] Name=hello, Size=40, Data(UTF8)=m
[INFO] *****************
[INFO] Finish DUMP Param
[INFO] *****************
[RESPONSE] my resoponse : 1808

Server.exe를 실행하고 Client.exe를 실행하면 각각 위와 같습니다.
2개의 Process가 정보를 서로 교환합니다.

코드는 다음과 같습니다.