在java中通过API读写hdfs入门学习
在上一篇文章hadoop安装中,我们安装好了MapReduce和HDFS,接下来看看如何在java中读写hdfs文件。
maven 镜像
这里我使用的是idea来进行java开发,使用maven进行包管理。由于官方仓库下载速度太慢,首先需要调整一下maven的镜像仓库。
一般是调整~/.m2/settings.xml
文件:
<settings>
<mirrors>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
</settings>
maven 依赖
调整完maven仓库镜像后,接下来将hadoop相关jar包依赖加入到项目中。
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.0</version>
</dependency>
hdfs api读写
hdfs的java api调用相当简单,基本上和读写本地文件一样,唯一的区别是hdfs的文件不能随机写,只能新增或向后添加。下面是一些测试例子,大家可以参考一下。
package com.gavinzh.learn.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.net.URL;
/**
* @author jiangmitiao
*/
public class LearnMain {
/**
* 判断路径是否存在
*/
public static boolean exits(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 复制文件到指定路径 若路径已存在,则进行覆盖
*/
public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
// fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖
fs.copyFromLocalFile(false, true, localPath, remotePath);
fs.close();
}
/**
* 下载文件到本地 判断本地路径是否已存在,若已存在,则自动进行重命名
*/
public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
File f = new File(localFilePath);
// 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...)
if (f.exists()) {
System.out.println(localFilePath + " 已存在.");
int i = 0;
while (true) {
f = new File(localFilePath + "_" + i);
if (!f.exists()) {
localFilePath = localFilePath + "_" + i;
break;
}
}
System.out.println("将重新命名为: " + localFilePath);
}
// 下载文件到本地
Path localPath = new Path(localFilePath);
fs.copyToLocalFile(remotePath, localPath);
fs.close();
}
/**
* 追加文件内容
*/
public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
// 创建一个文件读入流
FileInputStream in = new FileInputStream(localFilePath);
// 创建一个文件输出流,输出的内容将追加到文件末尾
FSDataOutputStream out = fs.append(remotePath);
// 读写文件内容
byte[] data = new byte[1024];
int read = -1;
while ((read = in.read(data)) > 0) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}
/**
* 测试使用URL来访问hdfs
*/
public static void useURLConnectHDFS(Configuration conf, String remoteFilePath)
throws IOException {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(conf));
URL url = new URL("hdfs", "192.168.10.103", 8999, remoteFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));
for (int i = 0; i < 8; i++) {
System.out.println(reader.readLine());
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// hadoop伪分布式搭建所在IP
conf.set("fs.default.name", "hdfs://192.168.10.103:8999");
// 本地路径
String localFilePath = "~//Downloads/to_kill_a_mockingbird_from_hdfs.txt";
// HDFS路径
String remoteFilePath = "/test/to_kill_a_mockingbird.txt";
boolean exist = exits(conf, remoteFilePath);
if (exist) {
// 如果存在,就尝试读取远端文件
useURLConnectHDFS(conf, remoteFilePath);
} else {
// 如果不存在,就上传本地文件
copyFromLocalFile(conf, localFilePath, remoteFilePath);
}
}
}