现象
偶发Filesystem close错误

原因
1、基础包中
HdfsUtil#saveLogToHdfs finally中调用hadoopFS.close();
最终会设置DFSClient中的clientRunning变量值为fasle,之前报错的堆栈信息中可以看到DFSClient#check方法就是在检查这个成员变量。虽然设置了fs.hdfs.impl.disable.cache
参数,但在HdfsUtil#getHadoopFS(java.lang.String)中又维护了一个缓存,且更新缓存的判断只是判断了fileSystem为空,当saveLogToHdfs方法触发了hadoopFS.close()时,fileSystem实例依然存在,但内部持有的DFSClient实例的成员变量clientRunning值已经被设置为了fasle,此后再使用fileSystem实例做其他操作时就会触发Filesystem closed错误。
2、指南针用的不是基础包中HdfsUtil类,但他们的工具类中delete download两个方法中均有调用Filesystem#close的地方,且没有设置fs.hdfs.impl.disable.cache参数。
3、其他触发Filesystem closed的可能点:除上述由于客户端代码引起,多线程同时使用同一Filesystem实例操作hdfs时,当其中一个线程由于namenode繁忙导致连接超时会设置DFSClient的clientRunning值为false,从而影响其他线程使用Filesystem实例。此情况源码中未验证,应该已在2.6.1后修改,参考jira:https://issues.apache.org/jira/browse/HDFS-7314?jql=project%20%3D%20HDFS%20AND%20status%20%3D%20Closed%20AND%20text%20~%20%22Filesystem%20closed%22
解决办法
1、针对基础包HdfsUtil工具类中又维护了缓存情况。
方法一:应该判断底层DFSClient类的clientRunning的值,但改值私有的且没有提供共有get方法,只能通过反射获取,另外DFSClient实例只存在DistributedFileSystem子类中,父类FileSystem中并不持有改对象,只能强制转换。代码如下:
/**
-
判断FileSystem实例是否可用
-
@param fs Hdfs的实现类 DistributedFileSystem
-
@return
-
@throws Exception
*/
private static boolean fsAviable(FileSystem fs) throws Exception {
//FileSystem fs =HdfsUtil.getHadoopFS("hadoop");
DFSClient client = ((DistributedFileSystem) fs).getClient();
Class clz = client.getClass();
Method isClientRunning = clz.getDeclaredMethod("isClientRunning");
isClientRunning.setAccessible(true);
Object invoke = isClientRunning.invoke(client);
System.out.println("fsAviable------" + invoke);
return (Boolean)invoke;
}
改方式修改前后已通过单元测试:测试代码如下:
public static void main(String[] args){
ExecutorService threadPool1 = Executors.newCachedThreadPool();
for(int i=0; i<3; i++){
threadPool1.execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
Thread.sleep(500);
System.out.println("thread id:" + Thread.currentThread().getId() + "," + HdfsUtil.exists("/"));
}
} catch (Exception e) {
System.out.println("thread id:" + Thread.currentThread().getId());
e.printStackTrace();
}
}
});
}
///////////////////////////////////
ExecutorService threadPool2 = Executors.newScheduledThreadPool(1);
((ScheduledExecutorService) threadPool2).schedule(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程池2,10s后关闭FileSystem");
FileSystem fs =HdfsUtil.getHadoopFS("hadoop");
fs.close();
} catch (Exception e) {
System.out.println("线程池2");
e.printStackTrace();
}
}
}, 10, TimeUnit.SECONDS);
}
方法二:去掉调用Filesystem#close的代码
2、指南针
方法一:改为使用基础包HdfsUtil工具类。
方法二:去掉调用Filesystem#close的代码
网友评论