前面的示例中,通过 Java 操作 HDFS 中的文件,都要将编译后的 *.java 文件复制到 hadoop cluster 中的某个 node 上,才能执行。但更多的情况其实是通过远程直接操作 HDFS,示例程序如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URI;
public class Demo {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* save a single file or a directory to HDFS
*
* @param localFile
* @param dstHdfsPath
* @return
*/
public boolean saveLocalFile(File localFile, String dstHdfsPath) {
boolean flag = false;
String uri = "hdfs://192.168.0.117:9000" + dstHdfsPath;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
fs.copyFromLocalFile(false,true, new Path(localFile.getAbsolutePath()), new Path(uri));
flag = true;
} catch (Exception ex) {
logger.warn("save file to HDFS failed", ex.getMessage());
flag = false;
}
return flag;
}
/**
* fetch file from HDFS
*
* @param hdfsPath
* @param localFile
* @return
*/
public boolean fetchHdfsFile(String hdfsPath, File localFile) {
boolean flag = false;
String uri = "hdfs://192.168.0.117:9000" + hdfsPath;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
fs.copyToLocalFile(false, new Path(uri), new Path(localFile.getAbsolutePath()));
flag = true;
} catch (Exception ex) {
logger.warn("fetch HDFS file failed", ex.getMessage());
flag = false;
}
return flag;
}
}
主要就是调用下 FileSystem 类的相关方法。