最近在写wormhole的HBase plugin,需要分别实现hbase reader和hbase writer,在测试的时候会报错如下:
2013-07-08 09:30:02,568 [pool-2-thread-1] org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1631) WARN client.HConnectionManager$HConnectionImplementation - Failed all from region=t1,,1373246892580.877bb26da1e4aed541915870fa924224., hostname=test89.hadoop, port=60020 java.util.concurrent.ExecutionException: java.io.IOException: Call to test89.hadoop/10.1.77.89:60020 failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.1.77.84:51032 remote=test89.hadoop/10.1.77.89:60020]. 59999 millis timeout left. at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1601) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1453) at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:936) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:783) at com.dp.nebula.wormhole.plugins.common.HBaseClient.flush(HBaseClient.java:121) at com.dp.nebula.wormhole.plugins.writer.hbasewriter.HBaseWriter.commit(HBaseWriter.java:112) at com.dp.nebula.wormhole.engine.core.WriterThread.call(WriterThread.java:52) at com.dp.nebula.wormhole.engine.core.WriterThread.call(WriterThread.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Call to test89.hadoop/10.1.77.89:60020 failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.1.77.84:51032 remote=test89.hadoop/10.1.77.89:60020]. 59999 millis timeout left. at org.apache.hadoop.hbase.ipc.HBaseClient.wrapException(HBaseClient.java:1030) at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:999) at org.apache.hadoop.hbase.ipc.SecureRpcEngine$Invoker.invoke(SecureRpcEngine.java:104) at com.sun.proxy.$Proxy5.multi(Unknown Source) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$3$1.call(HConnectionManager.java:1430) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$3$1.call(HConnectionManager.java:1428) at org.apache.hadoop.hbase.client.ServerCallable.withoutRetries(ServerCallable.java:215) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$3.call(HConnectionManager.java:1437) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$3.call(HConnectionManager.java:1425) ... 5 more Caused by: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.1.77.84:51032 remote=test89.hadoop/10.1.77.89:60020]. 59999 millis timeout left. 2013-07-08 09:30:03,579 [pool-2-thread-6] com.dp.nebula.wormhole.engine.core.WriterThread.call(WriterThread.java:56) ERROR core.WriterThread - Exception occurs in writer thread! com.dp.nebula.wormhole.common.WormholeException: java.io.IOException: org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@b7c96a9 closed at com.dp.nebula.wormhole.plugins.writer.hbasewriter.HBaseWriter.commit(HBaseWriter.java:114) at com.dp.nebula.wormhole.engine.core.WriterThread.call(WriterThread.java:52) at com.dp.nebula.wormhole.engine.core.WriterThread.call(WriterThread.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@b7c96a9 closed at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:877) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:857) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1568) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1453) at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:936) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:783) at com.dp.nebula.wormhole.plugins.common.HBaseClient.flush(HBaseClient.java:121) at com.dp.nebula.wormhole.plugins.writer.hbasewriter.HBaseWriter.commit(HBaseWriter.java:112) ... 7 more
wormhole的reader和writer会分别起一个ThreadPoolExecutor,出错是在writer端的flush阶段,也就是最后一次批量插入操作。由于我的reader是每一个thread一个htable instance没有问题,而writer是共用了一个singleton HBaseClient,然后用ThreadLocal去保证每一个thread拥有一个本地htable对象,有可能有错误,最简单的方法是把writer端不用singleton HBaseClient,问题应该解决,不过没搞清root cause,不爽啊。。。
后来看了HTable和HAdmin的源代码才有点线索
public HTable(Configuration conf, final byte [] tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); this.configuration = conf; int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); this.finishSetup(); }
每一个HTable instance都有一个HConnection对象,它负责与Zookeeper和之后的HBase Cluster建立链接(比如cluster中定位region,locations的cache,当region移动后重新校准),它由HConnectionManager来管理
public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (HBASE_INSTANCES) { HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf, true); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } }
HConnectionManager内部有LRU MAP => HBASE_INSTANCES的静态变量作为cache,key为HConnectionKey,包含了username和指定的properties(由传进去的conf提取), value就是HConnection具体实现HConnectionImplementation,由于传入进去的conf都一样,所以都指向同一个HConnectionImplementation,最后会调用connection.incCount()将client reference count加1
public void close() throws IOException { if (this.closed) { return; } flushCommits(); if (cleanupPoolOnClose) { this.pool.shutdown(); } if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); } } this.closed = true; }
HTable close的时候,会先flushCommits,将writerBuffer中的List<Put>一次性通过connection的processBatch方法处理掉,然后进入close connection逻辑,依据也是reference count,先对其减1,当为0或者是过期connection,就会执行close connection并从HBASE_INSTANCES中remove掉。
关闭步骤:
1. 关闭HMasterInterface
2. 关闭一串HRegionInterface
3. 关闭zookeeper watcher
HConnectionImplementation connection = HBASE_INSTANCES .get(connectionKey); if (connection != null) { connection.decCount(); if (connection.isZeroReference() || staleConnection) { HBASE_INSTANCES.remove(connectionKey); connection.close(stopProxy); } else if (stopProxy) { connection.stopProxyOnClose(stopProxy); } }else { LOG.error("Connection not found in the list, can't delete it "+ "(connection key="+connectionKey+"). May be the key was modified?"); }
HBaseAdmin内部也是一样,也是reference了一个HConnection,可以认为和HTable是共享HConnection
分析到这里再联想到之前的报错信息"java.io.IOException: org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@b7c96a9 closed",应该是client reference count数的问题,果然在我的HBaseClient代码里面在初始化环节会new一个singleton HBaseClient,里面new一个HBaseAdmin(thread共享),在各个thread启动时候会new 一个threadloca的htable对象
private HTable getHTable() throws IOException { HTable htable = threadLocalHtable.get(); if (htable == null) { htable = new HTable(conf, tableName); htable.setAutoFlush(autoFlush); htable.setWriteBufferSize(writeBufferSize); threadLocalHtable.set(htable); } return htable; }
但是我的close方法写的有问题,把HBaseAdmin close了很多遍(每个thread close了一次)
public synchronized void close() throws IOException { HTable table = threadLocalHtable.get(); if (table != null) { table.close(); table = null; threadLocalHtable.remove(); } if (admin != null) { admin.close(); } }
假设一种场景,有10个writer线程,那么reference count是11 (10个HTable和1个HBaseAdmin),前5个thread close没问题,第6个thread先close htable,然后发现引用数为0了,它关闭了HConnection,然后再close admin无效,但是其余4个线程这时候有可能在执行flush操作,HConnection既然已经断了,那肯定没办法flush完啊,抛出异常出来
public void flush() throws IOException { if (getPutBuffer().size() > 0) { getHTable().put(getPutBuffer()); clearPutBuffer(); } }
知道了错误原因后,果断在close HBaseAdmin后,把它指向NULL,这样就不会导致重复关闭的问题了
if (admin != null) { admin.close(); admin = null; }
这个问题困扰我几天,网上又找不到任何资料,后来还是看了HBase源代码才有新的发现并解决的,看来真碰到问题一定要懂底层代码才行。