MySQL批量写入

MySQL批量写入,通常可以使用JDBCTemplate的batchUpdate

1
2
3
public int [] batchUpdate (String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
}

使用后,针对批量操作,jdbc driver会render成批量语句发送给MySQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Log log = logs.get(i);
ps.setLong(1,log.getKeyid());
ps.setLong(2,log.getUserid());
ps.setLong(3,log.getPlanid());
ps.setLong(4,log.getUnitid());
ps.setLong(5,log.getLevel());
ps.setInt(6,log.getType());
}
@Override
public int getBatchSize() {
return logs.size();
}
});

今天一个线上case排查中发现,最终没有生效,SQL仍然是一条一条的发送出去,整体的性能下降明显。

查阅资料发现,原来MySQL默认是不支持batch的,jdbc driver虽然提供了batch接口,但是默认并没有开启,需要给JDBC Connection增加配置参数rewriteBatchedStatements=true,示例配置:

jdbc:mysql://10.10.10.38:5858?characterEncoding=gbk&rewriteBatchedStatements=true

PrepareStatement在执行executeBatch的时候,会对该参数进行判断,来进行批量操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public int[] executeBatch() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") //$NON-NLS-1$
+ Messages.getString("PreparedStatement.26"), //$NON-NLS-1$
SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
}
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new int[0];
}
// we timeout the entire batch, not individual statements
int batchTimeout = this.timeoutInMillis;
this.timeoutInMillis = 0;
resetCancelledState();
try {
statementBegins();
clearWarnings();
if (!this.batchHasPlainStatements
&& this.connection.getRewriteBatchedStatements()) {
if (canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
if (this.connection.versionMeetsMinimum(4, 1, 0)
&& !this.batchHasPlainStatements
&& this.batchedArgs != null
&& this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
return executeBatchSerially(batchTimeout);
} finally {
this.statementExecuting.set(false);
clearBatch();
}
}
}

需要指出的时,该参数在JDBC 5.1.8开始才开始支持,5.1.17进行了优化,如果采用该机制,期望采用5.1.17+的版本。

搭建博客环境

今天在Mac下搭建了Hexo环境,先说说搭建环境方面的问题。

###环境准备


Mac环境下最好先安装homeblew,避免后续的各种麻烦,安装过程非常简单:

1
$ ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"

然后用blew安装Node.js环境。

1
$ blew install node

接下来进行hexo的安装

1
$ npm install -g hexo

简单的等待后,一切就绪,然后可以进行相关目录,初始化博客目录。

1
2
3
$ hexo init github-blog
$ cd github-blog
$ npm install

接下来就可以对_config.yml等文件进行配置,进行博客撰写了。

###博客发布


配置github信息

deploy:
  type: github
  repository: https://github.com/home3k/home3k.github.io.git
  branch: master

接下来进行发布即可。

1
$ hexo deploy --generate

起步

有一段时间,经常会写写东西。不过,如梭带来了浮躁,浮躁带来的懒惰,慢慢地,文字就停了。

从今天开始,采用Markdown+hexo写自己的独立博客。之前的博客内容,也全部迁移过来。技术博客托管在Github上,生活博客托管在Gitcafe上。

希望自己能够坚持写下去,加油!

1
$ blog start

初始Vert.x

Vert.x框架最近比较火,它号称JVM上的Node.js。

http://vertx.io/

它目前支持JavaScript,Groovy,Ruby,Java,正在对Clojure,Scala,Python等。跟node.js一样,它是异步的、基于EDA的架构,跟Node.js一样,它拥有良好的并发及消息传递性能。

它支持HTTP/HTTPS,同时支持WebSockets,sockjs等高级协议。

其官方的benchmark相当漂亮:

Vert.x Benchmark


###event-driven模型

在vert.x内部,每一个vert.x实例称为一个verticle。verticle被bind到一个统一的event bus上,进行事件循环。其中一部分verticle是提供一些共享的lib(如共享的event handler),这种verticle被称为busmod。

通过event bus,Verticle可与运行在相同或不同vert.x实例中的其他verticle进行通信(基于Actor model)。

Vert.x verticle


###vert.x的线程模型

在vert.x中,每一个verticle(busmod)都被绑定到一个特殊的线程,所以在对verticle进行开发时,无需关注他的线程安全性。因此,对于vert.x instance,它管理着一个Thread Set,每个线程都维护了一个event loop。vert.x内部通过一个线程池,选择将一个event循环分配给具体的verticle。具体的线程响应模型采用了Reactor pattern

###内部实现

从其事件处理流程上很容易想到netty,vert.x在底层确实使用了netty。通过netty来处理高并发响应,reactor式的分派请求。vertx的ClusterManager基于Hazelcast进行轻量级实现。

具体细节还得看代码细节,有机会再分享https://github.com/vert-x/vert.x

简单的实现demo

1
2
3
4
5
6
7
8
9
Vertx vertx = Vertx .newVertx();
vertx.createHttpServer().requestHandler(
new Handler< HttpServerRequest>() {
public void handle(HttpServerRequest req) {
String file = req.path.equals( "/" ) ? "index.html"
: req.path;
req.response.sendFile( "webroot/" + file);
}
}).listen(8080);

DirectMemory源码分析

最近对cache相关进行了调研,看了一下off-heap cache DirectMemory源码,对其进行如下梳理:
源码路径:https://github.com/raffaeleguidi/DirectMemory

###引

Java cache通常的做法是通过缓存对象报错在heap,通过一定的持久化机制保存在disk。为了防止缓存对象被gc,通常用弱引用等wrap一下。

考虑到heap容量,缓存达到一定的容量必然会发生gc(full),由于full gc的STW,当heap容量达到10g以上时的pause time几乎是无法容忍的。

为了防止gc带来的性能问题,部分cache系统开始使用off-heap机制,通过对堆外内存的自主管理,防止额外的性能问题。这里面比较典型的是ehcache被terracotta收购后推出的BigMemory(http://www.ehcache.org/documentation/user-guide/bigmemory)

根据terracotta测试,BigMemory在350G+的场景下,表现良好。

由于BigMemory不开源,有个开源版的DirectMemory,实现跟其比较类似。

###DirectMemory

DirectMemory代码结构比较简单:

DirectMemory代码结构

下面结合其源码及数据的put操作,对其进行介绍:

####Cache

Cache作为入口,它通过ConcurrentMap 维护了String->Pointer,该Map的作用,后面会有介绍。该map通过google guava框架的MapMaker创建(可以方便的进行超时事件等)

Cache作为DirectMemory的入口,暴露了缓存操作的大部分接口,如put retrieve free等

需要指出的是,为了防止cache实例进入heap,cache的创建,属性同时通过static创建的。这也导致Cache在同一个JVM中是singleton的,无法根据需求创建多个cache

Cache层对对象的序列化是通过serialization包下的序列化器进行的,默认采用的是ProtoStuff。

下面是其put操作代码,从代码可以看到,Cache层主要通过调用memory包下的MemoryManager相应接口,进行cache的各种操作。同时操作后更新其本层Map

1
2
3
4
5
public static Pointer putByteArray (String key, byte[] payload, int expiresIn) {
Pointer ptr = MemoryManager. store(payload, expiresIn);
map.put(key, ptr);
return ptr;
}


####MemoryManager

MemoryManager维护了

1
2
public static List<OffHeapMemoryBuffer> buffers = new Vector<OffHeapMemoryBuffer>();
public static OffHeapMemoryBuffer activeBuffer = null;

activeBuffer即为当前buffer,如果当前buffer满了,则跳到buffers的下一个buffer中,这些buffer在buffers中,很显然,通过对buffer的分片,可以较好的提高并发度。

下面的代码是MemoryManager进行store操作的过程,里面用很明显的逻辑进行了切(分)片操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Pointer store(byte[] payload, int expiresIn) {
Pointer p = activeBuffer .store(payload, expiresIn);
if (p == null) {
if (activeBuffer.bufferNumber+1 == buffers.size()) {
return null ;
} else {
// try next buffer
activeBuffer = buffers.get(activeBuffer.bufferNumber+1);
p = activeBuffer .store(payload, expiresIn);
}
}
return p;
}

MemoryManager中维持的Buffer是OffHeapMemoryBuffer。

####OffHeapMemoryBuffer

OffHeapMemoryBuffer是off-heap写入的核心组件,其核心为一个

1
protected ByteBuffer buffer ;

通过ByteBuffer.allocateDirect(capacity),生成的DirectByteBuffer。作为NIO提供的Buffer,它可以更高效的进行off-heap操作。

为了更高效的进行buffer读写,它提供了

1
public List<Pointer> pointers = new ArrayList<Pointer>();。

对于Pointer对象,其核心字段:

1
2
3
4
5
public int start ; //buffer的开始位置
public int end ; //buffer的结束位置
public boolean free ; //pointer是否可用
public int bufferNumber ; //所属的buffer number
.....

从本质上讲,它标识了一个buffer分片。通过将数据存储在这些细粒度分片上,可以更好的进行数据获取,并可以围绕key进行必要的操作,例如:Cache中包含一个key-Pointer的ConcurrentHashMap,这样在进行retrieve或update操作时,可以通过key先获得Pointer,然后可以通过pointer快速地对buffer进行操作。

初始化时,Pointer为buffer大小。每次store的时候,先查找是否有free的Pointer,如果存在,执行slice,将原来的pointer切分出一块数据大小的新的pointer,封装pointer属性,然后slice buffer,并忘buffer里写数据。同时将报错数据的pointer放入到pointers中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private synchronized Pointer store(byte[] payload, long expiresIn, long expires) {
Pointer goodOne = firstMatch(payload. length);
if (goodOne == null ) {
throw new NullPointerException("did not find a suitable buffer");
}
Pointer fresh = slice(goodOne, payload. length);
fresh. created = System.currentTimeMillis();
if (expiresIn > 0) {
fresh. expiresIn = expiresIn;
fresh. expires = 0;
} else if (expires > 0) {
fresh. expiresIn = 0;
fresh. expires = expires;
}
fresh. free = false ;
used.addAndGet(payload.length );
ByteBuffer buf = buffer.slice();
buf.position(fresh. start);
try {
buf.put(payload);
} catch (BufferOverflowException e) {
// RpG not convincing - let's fix it later
goodOne. start = fresh.start ;
goodOne. end = buffer .limit();
return null ;
}
pointers.add(fresh);
return fresh;
}


###其他

  1. 从MemoryManager store操作代码可以看到,当store操作时,如果当前buffer空间不足时,直接进行换切片操作,显然如果数据非常大时,OffHeapMemoryBuffer存在空间浪费。同时纵观其代码,其内存管理比较粗,目前基本只涉及过期处理,LFU策略等。Pointer空间释放后,也没有必要的合并操作。内存空间浪费应该是个问题。
  2. key过期处理时,对Pointer进行查找时,在设计上采用了JoSQL框架,通过SQL方式获得Pointer,代码比较明晰。

伪共享

近期对并发编程进行了研究,看到已经从jsr166y添加到java 7里的LinkedTransferQueue及Disruptor框架均有对伪共享(False Sharing)问题进行了单独的处理,于是,对此类问题进行了梳理:

目前典型的CPU架构有三级缓存,从读取速度由快而慢分别为L1 -> L2 ->L3。对于多核架构,L1,L2为单核独享,而L3为多核共享。CPU指令执行即从L1 cache开始读取,如果cache miss,便从下一级cache读取,直到内存。

为了高效利用cache,CPU不是简单地将单条数据(指令)写入cache,而是将一批数据指令(连续地)写入cache行中。即cache行是对cache进行读写操作的最小单位。对于目前典型的core,ivy,sandy等cpu,cache行大小为64bytes。

同时,对于多核系统,每个核有私有的L1,L2,多线程并发时,如果某个核需要修改的变量同时在另外一个核的cache中,为了保证数据的一致性,需要使当前核cache中变量失效(invalidate),然后同步一致数据。这种操作是有硬件层级的缓存一致性协议来保证的,通常是M-E-S-I协议。其中M,E,S和I代表使用cache行所处的四个状态,协议通过四种状态的(复杂)迁移(类似状态机模型)来保证一致性。当发生上述不一致时,当前核会发出RFO(Request For Owner)请求来保证一致性,但是这个保证过程需要低层级cache或者内存的同步,会对性能造成很大的影响。
对于Java对象,相邻的成员变量被加载到同一个cache行中,当不同线程对成员变量分别操作时,就会导致RF0请求的发生,这种现象即伪共享。

Disruptor设计示意图

上图为disruptor作者设计的示意图,x,y变量分别被load到c1,c2的cache行中,c1更新x,c2更新y,而x,y位于同一条cache行中,此时两个线程轮番发送RFO消息,占有cache行拥有权,获得拥有权线程对变量的更新会导致其他核中的cache行中的变量失效,进而通过L3进行变量同步,而此时如果L3 miss,还需要通过内存同步,对性能造成很大的影响。因此,虽然x,y被独立线程操作,彼此无任何关系,因为伪共享,性能有很大的问题。
比较悲催的是,上面的x,y变量的伪共享在生产者-消费者模式中比较常见。生产者-消费者模式中,生产者和消费者作为不同的线程不停地操作队列的首尾两端(通过head,tail指针),而这两个指针对象定义在一起,加载head时,会将tail同时加载到同一个cache line中。

例如:Java j.u.c LinkedBlockingQueue的

1
2
3
4
5
/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;

head,last共占8bytes。两者被加载到同一个cache line。大量的生产者、消费者对queue进行读写时,会发生较大的性能问题

为了防止伪共享的发生,通常进行缓存行补齐(cache line padding),即对对象进行填充,使其占用一个cache行。这样可以保证对象不处于同一缓存行。

对于Hotspot Java对象,Java程序的对象头固定占8字节(32位系统),此时只需要填48bytes即可保证对象处于,不同的缓存行, 从而避免伪共享。(对于64位系统,对象头占用空间更大,多出也无所。)

例如:

在 LinkedTransferQueue中(早期的jsr166y版本,收录在早期的netty项目中),队列的head,tail如下定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** head of the queue */
private final PaddedAtomicReference<QNode> head;
/** tail of the queue */
private final PaddedAtomicReference<QNode> tail;
/**
* Padded version of AtomicReference used for head, tail and
* cleanMe, to alleviate contention across threads CASing one vs
* the other.
*/
private static final class PaddedAtomicReference<T> extends AtomicReference<T> {
private static final long serialVersionUID = 4684288940772921317L;
// enough padding for 64bytes with 4byte refs
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
PaddedAtomicReference(T r) { super(r); }
}

PaddedAtomicReference通过15个4byte对象,对AtomicReference进行了填充,从避免了伪共享,LinkedTransferQueue后期版本对其设计进行了更新,但核心类似,只是方法更加优雅(还没有完全看懂)。

在Disruptor框架中,其核心数据结构RingBuffer的序号由Sequence对象来维护,Sequence对象,定义了

1
private final long[] paddedValue = new long[15];

通过15个long来填充。序号设置/获取时分别调用

1
2
3
4
5
6
7
public long get(){
return unsafe.getLongVolatile(paddedValue, valueOffset);
}
public void set(final long value){
unsafe.putOrderedLong(paddedValue, valueOffset, value);
}

这样通过sun unsafe的CAS操作,对序号进行填充,从而避免了伪共享。

JAVA 7 G1收集器调研

最近在看虚拟机垃圾收集,看到了JAVA 7 G1收集器的相关内容,特深入调研了下。

G1收集器全称Garbage-First Garbage Collector。是在Java 6 Update 14中引入,旨在取代CMS收集器的一种新型收集器。在Java 6中只是试验性的引入,因各种原因没有正式引入。Java 7开始,其被正式引入。

作为一个server-style回收器,其具有如下属性:

1. 并行和并发

众所周知,目前所有的GC(无论是serial,parallel及近年来广泛使用的CMS)均存在暂停时间问题,所谓的暂停时间是由于GC的“stop-the-world” 机制(这个机制简称STW,即,在执行垃圾收集算法时,Java应用程序的其他所有除了垃圾收集帮助器线程之外的线程都被挂起)。而G1 可以从最新的硬件中获得并行的能力。它能够使用所有可用的CPU(CPU多核,硬件多线程,等)来加速它的STW暂停时间。虽然其并行机制在CMS中已有了一定的实现(即周期性的进行并发标记[concurrent marking phase]),但G1采用了新的实现方式。

该机制与G1新的堆内存管理机制相关。与其他GC收集器不同,在G1中,对象的新生代和老一代上并没有在物理上分隔开,而是把一个连续的堆内存拆分成了几个相同大小的区域。新生对象和老对象都会被放在一系列可能不连续的区域中。之所以这样做,就是为了让G1可以更灵活地移动老对象所占用的资源给新的对象。G1中的内存收集会发生 “疏散暂停”,当内存从一系列区域开始回收时,这些区域所引用的对象会被疏散到另一些区域中,这样,会有一整块的内存来重新被申请(其思想跟垃圾收集算法中的复制算法很类似)。疏散会发生整个程序的暂停,但“疏散”这些内存可以被并行运行,这正是G1的并发阶段做的事情。

2. 分代处理

与其它的HotSpot 垃圾回收器一样,G1 也是分代的。即它在处理新分配的对象(年轻代)和已经生存了一段时间的对象(年老代)时会不同,它会更多地考虑一些新创建的对象实例,因为越新创建的就越有最大的可能性被回收,老对象只是偶尔访问一下。对于大多数的Java应用来说,这个机制可以极大地提高回收效率。

3. 紧凑内存(碎片整理

与CMS收集器不同,G1 会对堆进行内存整理。压缩可以消除潜在的内存碎片的问题,这样程序就可以更长时间的平滑运行。

4. 预见性

G1 比起 CMS 来有更多的预见性。这个主要还是用来消除内存碎片的问题。内存的碎片少了,STW的暂停时间也会被减少。

目前G1仍然还在试验阶段,使用下面两个参数可以打开G1机制:

-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC

目前G1收集器还存在如下问题:

  1. G1不支持 JVM TI JMX等工具,由于相当数量的JVM管理及监控工具都是基于这两个服务的,因此基于G1很多工具无法正确使用。
  2. G1不支持增量永生代收集。因此,在应用卸载类时,无法进行收集。
  3. STW的暂停时间不太稳定,与CMS相比,时好时坏。

It is running out

I think i’m drowning
asphyxiated
i wanna break this spell
that you’ve created

you’re something evil
a contradiction
i wanna finish the game
and kill the friction

But you will be the death of me
you are just the death of me

bury it
i wanna bury it
i wanna smother it
i wanna murder it

I hope it is running out
and I hope the time is running out

But I can’t push it underground
I can’t stop it screaming out

I wanted freedom
but I’m restricted
I tried to give it up
but I’m addicted

Maybe you know i’m trapped
sense of elation
I know you’ll never dream of
breaking this fixation

you will squeeze the life out of me
til it run out.

Find a home

Find A Home (New Forest Shaker)
              Delays

One more year of digging here
And we’re alight in heaven; we’re alight in heaven
If we bare the stones and stares
Then we’re alight in heaven, we’re alight in heaven,

Mother says to hold our tongues; we are the chosen ones,
And we answer to no one,

Same dream I’m always having,
Like shivering, shivering, shivering…

Find a home amongst the trees,
Bend your branches over me,
Find a home, defy the freeze,
Dance around the rosaries…

Faith alone must clear this snow
Or we’ll have doubted heaven; we’ll have doubted heaven
It’s finisterre for dancing bears
If we have doubted heaven; we have doubted heaven

Everyone I left behind, they think I left my mind under Mesmer, sola fide
Same dream I’m always having,
Like shivering, shivering, shivering…

Find a home amongst the trees,
Bend your branches over me,
Find a home; defy the freeze, and glow,

Find a home…

Oh I can row, I can row, I can row back home,
Or we can lay, we can lay, we can lay in Sway,

Exogenesis Symphony, Pt. 2 Cross-Pollination

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package music.album.the.resistance;
import foobar;
import last.fm;
/**
* 编程需要交流
* @author Muse
* @author Sukani
**/
public class Exogenesis Symphony, Pt. 2 Cross-Pollination {
public void sing() {
/*我们不受周围人的影响*/
Rise above the crowds;
/*我们的充满辐射的环境下工作*/
Wade through toxic clouds;
/*我们要打破欧美软件列强的垄断*/
Breach the outer sphere;
/*我们虽然没有把握, 但是相互交流或许能提高*/
The edge of all our fears rest with you;
/*我们需要交流!*/
We are counting on you;
/*我们需要交流!*/
It's up to you;
/*赶紧把代码共享出来!*/
Spread our codes to the stars;
/*交流才是王道*/
You must rescue us all;
/*赶紧把代码共享出来!*/
Spread our codes to the stars;
/*交流才是王道*/
You must rescue us all;
/*告诉大家*/
Tell us;
/*你这段代码倒是是干什么用的!*/
Tell us your final wish;
/*说不明白就别想回去!*/
Now we know you can never return;
/*告诉大家*/
Tell us;
/*你这段代码倒是是干什么用的!*/
Tell us your final wish;
/*我们要Open-Source, 与全球的同行分享!*/
We will tell it to the world;
}
public void afterSing() {
if (action.equals("闲的!")) {
System.out.println("也是被论文逼得.");
}
}
}