DirectMemory源码分析

最近对cache相关进行了调研,看了一下off-heap cache DirectMemory源码,对其进行如下梳理:
源码路径:https://github.com/raffaeleguidi/DirectMemory

Java cache通常的做法是通过缓存对象报错在heap,通过一定的持久化机制保存在disk。为了防止缓存对象被gc,通常用弱引用等wrap一下。

考虑到heap容量,缓存达到一定的容量必然会发生gc(full),由于full gc的STW,当heap容量达到10g以上时的pause time几乎是无法容忍的。

为了防止gc带来的性能问题,部分cache系统开始使用off-heap机制,通过对堆外内存的自主管理,防止额外的性能问题。这里面比较典型的是ehcache被terracotta收购后推出的BigMemory(http://www.ehcache.org/documentation/user-guide/bigmemory)

根据terracotta测试,BigMemory在350G+的场景下,表现良好。

由于BigMemory不开源,有个开源版的DirectMemory,实现跟其比较类似。

DirectMemory

DirectMemory代码结构比较简单:

DirectMemory代码结构

下面结合其源码及数据的put操作,对其进行介绍:

Cache

Cache作为入口,它通过ConcurrentMap 维护了String->Pointer,该Map的作用,后面会有介绍。该map通过google guava框架的MapMaker创建(可以方便的进行超时事件等)

Cache作为DirectMemory的入口,暴露了缓存操作的大部分接口,如put retrieve free等

需要指出的是,为了防止cache实例进入heap,cache的创建,属性同时通过static创建的。这也导致Cache在同一个JVM中是singleton的,无法根据需求创建多个cache

Cache层对对象的序列化是通过serialization包下的序列化器进行的,默认采用的是ProtoStuff。

下面是其put操作代码,从代码可以看到,Cache层主要通过调用memory包下的MemoryManager相应接口,进行cache的各种操作。同时操作后更新其本层Map

1
2
3
4
5
 public static Pointer putByteArray (String key, byte[] payload, int expiresIn) {
Pointer ptr = MemoryManager. store(payload, expiresIn);
map.put(key, ptr);
return ptr;
}


MemoryManager

MemoryManager维护了

1
2
public static List<OffHeapMemoryBuffer> buffers = new Vector<OffHeapMemoryBuffer>();
public static OffHeapMemoryBuffer activeBuffer = null;

activeBuffer即为当前buffer,如果当前buffer满了,则跳到buffers的下一个buffer中,这些buffer在buffers中,很显然,通过对buffer的分片,可以较好的提高并发度。

下面的代码是MemoryManager进行store操作的过程,里面用很明显的逻辑进行了切(分)片操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
        public static Pointer store(byte[] payload, int expiresIn) {
Pointer p = activeBuffer .store(payload, expiresIn);
if (p == null) {
if (activeBuffer.bufferNumber+1 == buffers.size()) {
return null ;
} else {
// try next buffer
activeBuffer = buffers.get(activeBuffer.bufferNumber+1);
p = activeBuffer .store(payload, expiresIn);
}
}
return p;
}

MemoryManager中维持的Buffer是OffHeapMemoryBuffer。

OffHeapMemoryBuffer

OffHeapMemoryBuffer是off-heap写入的核心组件,其核心为一个

1
protected ByteBuffer buffer ;

通过ByteBuffer.allocateDirect(capacity),生成的DirectByteBuffer。作为NIO提供的Buffer,它可以更高效的进行off-heap操作。

为了更高效的进行buffer读写,它提供了

1
public List<Pointer> pointers = new ArrayList<Pointer>();。

对于Pointer对象,其核心字段:

1
2
3
4
5
 public int start ;  //buffer的开始位置
public int end ; //buffer的结束位置
public boolean free ; //pointer是否可用
public int bufferNumber ; //所属的buffer number
.....

从本质上讲,它标识了一个buffer分片。通过将数据存储在这些细粒度分片上,可以更好的进行数据获取,并可以围绕key进行必要的操作,例如:Cache中包含一个key-Pointer的ConcurrentHashMap,这样在进行retrieve或update操作时,可以通过key先获得Pointer,然后可以通过pointer快速地对buffer进行操作。

初始化时,Pointer为buffer大小。每次store的时候,先查找是否有free的Pointer,如果存在,执行slice,将原来的pointer切分出一块数据大小的新的pointer,封装pointer属性,然后slice buffer,并忘buffer里写数据。同时将报错数据的pointer放入到pointers中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private synchronized Pointer store(byte[] payload, long expiresIn, long expires) {
Pointer goodOne = firstMatch(payload. length);

if (goodOne == null ) {
throw new NullPointerException("did not find a suitable buffer");
}

Pointer fresh = slice(goodOne, payload. length);


fresh. created = System.currentTimeMillis();
if (expiresIn > 0) {
fresh. expiresIn = expiresIn;
fresh. expires = 0;
} else if (expires > 0) {
fresh. expiresIn = 0;
fresh. expires = expires;
}

fresh. free = false ;
used.addAndGet(payload.length );
ByteBuffer buf = buffer.slice();
buf.position(fresh. start);
try {
buf.put(payload);
} catch (BufferOverflowException e) {
// RpG not convincing - let's fix it later
goodOne. start = fresh.start ;
goodOne. end = buffer .limit();
return null ;
}
pointers.add(fresh);
return fresh;
}


其他

  1. 从MemoryManager store操作代码可以看到,当store操作时,如果当前buffer空间不足时,直接进行换切片操作,显然如果数据非常大时,OffHeapMemoryBuffer存在空间浪费。同时纵观其代码,其内存管理比较粗,目前基本只涉及过期处理,LFU策略等。Pointer空间释放后,也没有必要的合并操作。内存空间浪费应该是个问题。
  2. key过期处理时,对Pointer进行查找时,在设计上采用了JoSQL框架,通过SQL方式获得Pointer,代码比较明晰。