From IdeA thinKING
요약
기존의 MemPool 클래스들은 메모리 크기와 성능에서 최소한의 overhead를 가지도록 하는것이 목적이었습니다. 이번 글에서는 overhead를 줄이는 목적이 아닌 안정적인 그리고 debugging이 용이한 구조를 가지는 MemPool을 구현해보겠습니다.
구현
기존 MemPool과의 차이점
먼저 기본적인 MemPool 구현에 관한 사항은 이전과 동일합니다. 이전 글들을 참고하세요. 이번장에서는 기존 MemPool과의 차이점에 대해 알아봅니다.
Debugging 정보를 위한 데이터 할당
기존에는 사용자에게 할당해줄 메모리와 free된 메모리에서 linked list를 유지하기 위한 데이터 영역을 서로 중첩해서 사용했었습니다. 따라서 사용자에게 할당된 메모리에는 더이상 MemPool만의 어떤 데이터도 저장할 수 없없습니다.
이번 MemPool에서는 실제 사용자에게 할당해줄 메모리 영역의 앞쪽에 MemPool에서 사용할 데이터를 추가로 할당하도록 수정되었습니다. 또한 이 구조는 MemPoolClass, MemPoolShare, MemPoolSize등에서 모두 사용하므로 MemPoolHelper.hpp라는 파일에 선언하여 공통으로 사용합니다.
const int MAGIC_CODE = 0xDEADC0DE;
template <class T>
struct Chunk
{
int magic;
int count;
const char* new_file;
int new_line;
const char* del_file;
int del_line;
time_t new_time;
Chunk* next;
Chunk* prev;
union
{
double not_used; // to avoid memory alignment problem
char buf[sizeof(T)]; // let compiler make padding
};
};
위의 코드에서 MAGIC_CODE는 실제 이 영역이 MemPool이 할당한 영역인지를 check하기 위해 사용합니다. count는 최초 0으로 셋팅되고 new될때 +1, delete될때 -1하여 혹시 두번 delete되지는 않는지들을 체크하는데 사용됩니다.
실제 사용자가 사용하게 될 영역은 buf 로 이 배열의 크기는 template parameter로 입력받은 type의 크기입니다. double로 선언된 not_used 변수는 사용자에게 할당될 메모리 영역의 alignment를 위해 선언되었습니다. 이 변수에 의해 buf 영역은 항상 double이 시작할 수 있는 영역과 같아지게 됩니다. ((실제 메모리 alignment가 맞지 않는 경우 어떤 processor에서는 성능저하가 일어나고(Intel등) 또 다른 processor에서는 core가 발생합니다(Sparc등).))
메모리 영역 관리를 위한 방법을 FIFO에서 LIFO로 변경
위에서 알아본 Chunk struct를 사용하기 위해 아래와 같은 helper function들을 같은 파일내에 정의하였습니다.
// it is implemented as a separate function instead of constructor
// to make Chunk POD type.
// if it's not POD type, offsetof operation is warned by compiler.
template <class T>
void initChunk(Chunk<T>& c)
{
c.magic = MAGIC_CODE;
c.count = 0;
c.new_file = 0;
c.new_line = 0;
c.del_file = 0;
c.del_line = 0;
c.new_time = 0;
c.next = 0;
c.prev = 0;
}
// make new list that is linked circularly
template <class T>
Chunk<T>* makeList(std::size_t OBJS)
{
typedef Chunk<T> Chunk;
Chunk* block = new Chunk[OBJS];
// init newly allocated block
for (int i = 0; i < OBJS; ++i) {
initChunk(block[i]);
}
// init next, prev pointer
for (int i = 0; i < OBJS - 1; ++i) {
block[i].next = &(block[i + 1]);
block[i + 1].prev = &(block[i]);
}
// make circular
block[0].prev = &(block[OBJS - 1]);
block[OBJS - 1].next = &(block[0]);
return block;
}
// return new list header
template <class T>
void insertListToList(Chunk<T>*& target, Chunk<T>* source)
{
if (source == 0) return;
if (target != 0) {
std::swap(source->prev->next, target->prev->next);
std::swap(source->prev, target->prev);
}
target = source;
}
// return dettached item, parameter list will be updated
template <class T>
Chunk<T>* deleteItemFromListHead(Chunk<T>*& list)
{
if (list == 0) return 0;
Chunk<T>* ret = list;
if (list->prev == list) {
// for the that list have only one item
list = 0;
}
else {
list->next->prev = list->prev;
list->prev->next = list->next;
list = list->next;
}
return ret;
}
// return dettached item, parameter list will be updated
template <class T>
void appendItemToListTail(Chunk<T>*& list, Chunk<T>* item)
{
if (item == 0) return;
if (list == 0) {
item->next = item;
item->prev = item;
list = item;
}
else {
list->prev->next = item;
item->prev = list->prev;
item->next = list;
list->prev = item;
}
}
template <class T>
void deleteItemFromList(Chunk<T>*& list, Chunk<T>* item)
{
if (item->prev == item) {
list = 0;
}
else {
item->prev->next = item->next;
item->next->prev = item->prev;
}
if (list == item) {
list = item->next;
}
}
제일 위의 initChunk와 같은 함수는 Chunk struct에 생성자로 구현하는 것이 사용하는데 더 편리하겠으나 주석에 써있는바와 같이 생성자를 만들면 컴파일러가 그 struct를 POD가 아닌것으로 간주하여 offsetof등의 매크로 사용시 경고를 발생시킬 수 있으므로 따로 함수로 작성하였습니다.
위의 코드에서 알 수 있는 것처럼 실제 MemPool 클래스에 head와 tail 포인터 두개를 저장하는 대신 하나의 head 포인터로 관리가 가능하도록 ring 형태의 doubly linked list로 구현하였습니다. 즉, head와 tail이 연결되도록 구현하였습니다.
그리고 이전에는 singly linked list를 사용하였으나 이제 list의 중간에서 item이 지워지더라도 list가 계속 유지되어야 하기 때문에 doubly linked list를 사용하였습니다. List의 중간에서 item이 지워지는 경우는 현재 new 되어 사용중인 Chunk가 delete되는 경우입니다. 기존의 MemPool들은 일단 new 되어 사용중인 Chunk에 대해서는 관리하지 않았으나 이번 MemPool에서는 이 list를 관리하여야 하기 때문에 이러한 구조가 필요합니다.
또한 이전에는 성능상의 이유로 FIFO 구조를 사용하였습니다. 가장 최근에 사용된 영역이 cache에 남아 있을 가능성이 크기 때문에 성능에 도움이 될것이라고 생각되었기 때문입니다. 물론 이 생각엔 변함이 없습니다만 실제 multi-thread 환경에서 사용시에 코딩상의 오류로 인해 지워진 메모리 영역에 access하는 문제들이 많이 발생하였습니다. 물론 근본 원인을 찾아 수정하여야 하겠지만 설사 실제 동작시에 이 문제가 발생하더라도 최대한 안정성을 가지기 위해 LIFO 구조로 수정하였습니다. 즉, 최근에 delete된 메모리는 당분간 new 되어 사용되지 않을 것이기 때문에 이 기간에 설사 코딩상의 오류로 인해 이 메모리에 접근하더라도 다른 코드에서 사용중인 영역이 아니기 때문에 큰 문제가 발생하지는 않게 됩니다. 물론 운이 좋으면 말이죠. :-)
파일 이름과 라인 저장을 위한 mp_new, mp_delete 매크로 추가
위의 Chunk struct에서 볼 수 있듯이 MemPool의 데이터 영역에는 파일 이름과 라인을 저장하기 위한 변수들이 마련되어 있습니다. 하지만 이를 사용하기 위해서는 일반 ::operator new(size_t s), ::operator delete (void* p) 대신에 ::operator new(size_t s, const char*, int ), ::operator delete (void* p, const char*, int) 가 호출되어야 합니다. 이를 쉽게 사용할 수 있도록 하기 위해 아래와 같이 매크로를 작성하였습니다.
#ifndef _MP_NOUSE
# define mp_new(T) new (__FILE__, __LINE__) T
# define mp_delete(t) t->operator delete (t, __FILE__, __LINE__)
#else
# define mp_new(T) new T
# define mp_delete(t) delete t
#endif
이 매크로를 사용하려면 기존의 코드를 다음과 같이 수정하면 됩니다.
// 기존
A* a = new A;
A* a = new A(3, 5);
delete a;
// 변경
A* a = mp_new(A);
A* a = mp_new(A)(3, 5);
mp_delete(a);
MemPoolClass
그럼 이제 가장 간단한 MemPoolClass를 살펴보겠습니다. 이전의 MemPool과 사용법은 동일하며 선언 역시 동일합니다. 다만 실제 작업을 하는 함수를 do_new, do_delete 라는 private helper function으로 따로 구현하였습니다.
template <
class T,
int OBJ_IN_A_BLOCK = 50,
template <class> class LockModel = NoLock
>
class MemPoolClass : public LockModel<MemPoolClass<T, OBJ_IN_A_BLOCK, LockModel> >
{
typedef detail::Chunk<T> Chunk;
typedef typename MemPoolClass::Lock Lock;
public:
static void* operator new (std::size_t s)
{
assert(sizeof(T) == s);
return do_new(0, 0);
}
static void* operator new (std::size_t s, const char* file, int line)
{
assert(sizeof(T) == s);
return do_new(file, line);
}
static void operator delete (void* p, std::size_t )
{
do_delete(p, 0, 0);
}
static void operator delete (void* p, const char* file, int line)
{
do_delete(p, file, line);
}
// see C++ Coding Standards 45
static void operator delete (void* p, std::size_t , const char* file, int line)
{
do_delete(p, file, line);
}
그럼 이제 do_new 와 do_delete 함수를 살펴볼까요?
do_new
먼저 do_new 입니다. Debugging 정보 기록을 위한 라인들과 freed Chunk가 부족할때 reserve 하는 부분을 빼면 실제 작업은 단 두줄입니다. detail::deleteItemFromListHead는 freed_ list의 head 부분에서 Chunk를 하나 꺼내는 것이고 detail::appendItemToListTail은 used_ list의 tail 부분에 이 Chunk를 넣는 것입니다. 사실 used_ list에는 head에 넣든 tail에 넣든 상관없습니다. 언제 used_ list에 들어갔든지 사용자가 delete 하면 list에서 빠지는 것이기 때문입니다. 하지만 freed_ list는 위에서 살펴보았듯이 LIFO 구조를 사용하기로 하였기 때문에 head에서 꺼냈으므로 나중에 넣을때는 반드시 tail에 넣어주어야 합니다.
꺼낸 Chunk의 buf 멤버 변수의 포인터를 리턴하며 사용자는 이 영역을 사용하게 됩니다.
static void* do_new(const char* file, int line)
{
Lock lk; lk;
if (freed_ == 0) {
reserve(true);
}
Chunk* ret = detail::deleteItemFromListHead(freed_);
detail::appendItemToListTail(used_, ret);
++currentObj_;
++(ret->count);
ret->new_time = time(0);
// set file info
ret->new_file = file;
ret->new_line = line;
return ret->buf;
}
do_delete
다음으로 do_delete 입니다. 이 함수 역시 debugging 정보 기록을 위한 라인들을 빼면 단 두줄이 남습니다. detail::deleteItemFromList 는 used_ list에서 Chunk를 빼줍니다. 이 함수의 구현을 위해 doubly linked list가 필요했습니다. detail::appendItemToListTail 는 freed_ list의 tail에 Chunk를 다시 넣어줍니다. 이것으로 MemPool의 LIFO 구조가 구현되었습니다.
static void do_delete(void* p, const char* file, int line)
{
const char* p_file = file;
int p_line = line;
if (p_file == 0) {
p_file = "?";
p_line = 0;
}
Lock lk; lk;
// we could use offsetof macro, but we didn't to make code portable.
Chunk* c = ((Chunk*)((char*)(p)-(unsigned long)(&(((Chunk*)0)->buf))));
// do check
if (c->magic != detail::MAGIC_CODE) {
MP_ERROR("##@@ occurred: " << p_file << "(" << std::dec << p_line << ")",
c
);
return;
}
--(c->count);
if (c->count != 0) {
MP_ERROR("##@@ occurred: " << p_file << "(" << std::dec << p_line << ")",
c
);
return;
}
c->del_file = file;
c->del_line = line;
detail::deleteItemFromList(used_, c);
detail::appendItemToListTail(freed_, c);
--currentObj_;
}
reserve
마지막으로 do_new에서 살펴보았던 reserve 함수에 대해 알아보겠습니다. 역시 두줄이군요. :-) 간단히 detail::makeList 함수를 사용하여 Chunk들의 block을 할당한 후 이 list를 기존의 freed_ list에 추가합니다. detail::makeList는 함수내에서 Chunk들의 block을 doubly linked list로 만들어주기 때문에 이 함수 호출 후 단순히 기존의 freed_ list에 추가만 하면 됩니다.
static void reserve(bool debugOutput = false)
{
Chunk* block = detail::makeList<T>(OBJ_IN_A_BLOCK);
detail::insertListToList(freed_, block);
poolSize_ += OBJ_IN_A_BLOCK;
if (debugOutput) {
MP_DEBUG("##@@ class name: " << typeid(T).name() << "\n"
"##@@ pool size: " << std::dec << poolSize_
);
}
}
이것으로 MemPoolClass에 대한 설명은 끝입니다. 뒤에 나올 MemPoolShare와 MemPoolSize는 같은 구현을 사용합니다. 따라서 중복되는 내용은 다시 설명하지 않습니다.
MemPoolShare
MemPoolShare는 단지 template parameter로 class T가 아닌 size_t SIZE 를 받는다는 것만 다를 뿐 나머지는 모두 똑같습니다. 문제는 Chunk 클래스가 buf의 크기를 알기 위해 sizeof(T) operator를 사용한다는 것인데 이를 위해 MemPoolHelper.hpp 파일에는 다음과 같이 helper class 가 존재합니다.
template <std::size_t SIZE>
struct HasSize
{
char buf[SIZE];
};
위 클래스를 사용하여 구현한 MemPoolShare는 다음과 같습니다.
template <
std::size_t SIZE,
int OBJ_IN_A_BLOCK = 50,
template <class> class LockModel = NoLock
>
class MemPoolShare : public LockModel<MemPoolShare<SIZE, OBJ_IN_A_BLOCK, LockModel> >
{
typedef detail::HasSize<SIZE> HasSize;
typedef detail::Chunk<HasSize> Chunk;
typedef typename MemPoolShare::Lock Lock;
Chunk의 typedef이 Chunk<T>에서 Chunk<HasSize>로 달라진 것을 볼 수 있습니다.
MemPoolShare는 이 차이점외에는 MemPoolClass와 동일합니다.
MemPoolSize
MemPoolSize 역시 기존의 MemPoolSize와 기본 구현 idea는 동일합니다. 하지만 이번에는 크기별로 각각의 클래스가 생성되도록 template을 작성하였습니다. 이렇게 구현한 이유는 MemPoolHelper에 선언된 Chunk 클래스를 MemPoolSize에서도 사용하기 위해서입니다. 그런데 이 방법에는 한가지 단점이 있는데 이에 대해서 살펴보겠습니다.
각각의 크기별 구현은 다음과 같이 선언됩니다. MemPoolShare와 같습니다만 Base 클래스를 상속받는 점이 다릅니다.
template <std::size_t SIZE, int OBJ_IN_A_BLOCK, template <class> class LockModel>
class Impl : public Base, public LockModel<Impl<SIZE, OBJ_IN_A_BLOCK, LockModel> >
{
typedef detail::HasSize<SIZE> HasSize;
typedef detail::Chunk<HasSize> Chunk;
typedef typename Impl::Lock Lock;
Base 클래스는 다음과 같은 interface 만을 모아놓은 클래스입니다. 각각 다른 Impl 클래스들을 하나의 함수로 다루기 위해 이와 같은 base 클래스가 필요합니다.
class Base
{
public:
virtual void* do_new(std::size_t real_s, const char* file, int line) = 0;
virtual void do_delete(void* p, const char* file, int line) = 0;
virtual int size() const = 0;
virtual int capacity() const = 0;
virtual void printAll() = 0;
virtual void reserve(bool debugOutput) = 0;
};
위와 같은 클래스를 사용하여 다음과 같이 ChunkAllocator 클래스가 작성됩니다. 여기서 보면 get_impl() 함수가 특정 index의 크기에 맞는 클래스를 Base*로 리턴한다는 것을 알 수 있습니다.
template <int MIN_SIZE, int MAX_SIZE, int OBJ_IN_A_BLOCK, template <class> class LockModel>
class ChunkAllocator : public LockModel<ChunkAllocator<MIN_SIZE, MAX_SIZE, OBJ_IN_A_BLOCK, LockModel> >
{
typedef typename ChunkAllocator::Lock Lock;
public:
template <int SIZE>
struct size_helper
{
typedef size_helper_impl<SIZE, MIN_SIZE, MAX_SIZE> type;
};
static void* do_new(int index, std::size_t real_s, const char* file, int line)
{
return get_impl(index)->do_new(real_s,
file,
line
);
}
static void do_delete(int index, void* p, const char* file, int line)
{
get_impl(index)->do_delete(p,
file,
line
);
}
문제는 바로 이 get_impl 함수의 구현입니다. 예를 들어 MIN_SIZE가 32, MAX_SIZE가 4096 이면 Base* 는 모두 log2(4096 / 32) + 1 개가 필요합니다. 이 배열의 갯수인 8 은 compile time에 다음과 같이 계산 가능합니다.
enum {
POOL_COUNT = log_2<MAX_SIZE / MIN_SIZE>::value + 1
};
하지만 이때 실제 리턴해야할 크기의 Impl 타입은 compile time에 모두 지정이 불가능합니다. ((가능할지도 모르나 아직 발견하지 못했습니다.)) 예를 들어 index가 1이라면 실제 리턴해야 할 타입은 다음과 같습니다.
SingletonHolder<typename ImplType<n>::type, LockModel>::instance();
여기서 ImplType은 다음과 같습니다. power_2 는 log_2와 유사하게 만들어진 template 클래스입니다.
template <int N>
struct ImplType
{
typedef Impl<MIN_SIZE * power_2<N>::value, OBJ_IN_A_BLOCK, LockModel> type;
};
따라서 현재는 최대 10개까지의 타입을 하드코딩하여 지정하였으며 만약 이 갯수로 모자랄때는 compile timer assertion이 발생하도록 BOOST_STATIC_ASSERT를 설치하였습니다.
구현된 get_impl 함수는 다음과 같습니다.
static Base* get_impl(int index)
{
enum {
POOL_COUNT = log_2<MAX_SIZE / MIN_SIZE>::value + 1
};
Lock lk; lk;
static Base* impl_[POOL_COUNT] = { 0 };
Base* ret = impl_[index];
if (ret == 0) {
switch (index) {
#define case_n(n) \
case n: ret = SingletonHolder<typename ImplType<n>::type, LockModel>::instance(); break
case_n(0);
case_n(1);
case_n(2);
case_n(3);
case_n(4);
case_n(5);
case_n(6);
case_n(7);
case_n(8);
case_n(9);
// (1)
#undef case_n
}
// If you have compile error in this BOOST_STATIC_ASSERT,
// you should add case statement more in the upper switch(1)
// and the number in the BOOST_STATIC_ASSERT(2).
// I'm looking for more smart and automated method, but not found yet. :(
BOOST_STATIC_ASSERT(POOL_COUNT <= 9); // (2)
impl_[index] = ret;
}
return ret;
}
사용 예제
그럼 MemPoolSize를 사용하는 예제를 하나 살펴보겠습니다. 예제 코드는 아래와 같으며 제일 아랫줄과 같이 일부러 double delete 에러를 발생시켰습니다.
#include <iostream>
#include "MemPoolSize.hpp"
using namespace std;
class A : public MemPoolSize<A>
{
char buf[500];
};
int main()
{
A* a[50] = { 0 };
for (int i = 0; i < 50; ++i) {
a[i] = mp_new(A);
}
for (int i = 0; i < 50; ++i) {
mp_delete(a[i]);
}
mp_delete(a[15]);
}
위 코드를 MTLock을 사용하여 컴파일할때는 boost_thread 라이브러리와 같이 링크하여야 합니다.
실행하여 나온 출력은 다음과 같습니다. 처음 네줄은 최초 50개의 pool을 잡으면서 출력하는 debugging 메시지입니다. 그리고 그 아래는 double delete를 검출하여 이 메모리가 어디서 new 되었는지, 그리고 어디서 최초 delete되었는지를 출력합니다. 제일 위의 occurred 라인에서는 실제 double delete가 발생한 라인을 출력합니다. magic 키는 정상이나 count가 -1이므로 double delete 에러가 발생했음을 알 수 있습니다.
##@@ == MemPool (DEBUG) ============================================
##@@ class size: 512
##@@ pool size: 50
##@@ ===============================================================
##@@ == MemPool (ERROR) ============================================
##@@ occurred: xx.cpp(23)
##@@ ---------------------------------------------------------------
##@@ magic/count: DEADC0DE/-1
##@@ newed: xx.cpp(16) at Mon May 8 10:14:57 2006
##@@ deleteded: xx.cpp(20)
##@@ ===============================================================
결론
이번 글에서는 debugging 기능이 강화된 MemPool 클래스 구현에 대해 알아보았습니다. 이번에 구현된 클래스들은 이전 것들과는 달리 기본 ::operator new, delete보다 성능이나 메모리면에서 overhead를 가지고 있습니다. 하지만 이 기능은 _MP_NOUSE 를 #define 하면 꺼놓을 수도 있으므로 mp_new, mp_delete를 사용하여 코딩해 놓으면 향후 debugging이 어려운 메모리 문제 발생시에 도움이 될것으로 생각됩니다.
긴글 읽으시느라 수고하셨습니다. :-)