在上一篇文章hadoop安装中,我们安装好了MapReduce和HDFS,接下来看看如何在java中读写hdfs文件。

maven 镜像

这里我使用的是idea来进行java开发,使用maven进行包管理。由于官方仓库下载速度太慢,首先需要调整一下maven的镜像仓库。

一般是调整~/.m2/settings.xml文件:

<settings>
    <mirrors>
        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>
    </mirrors>
</settings>

maven 依赖

调整完maven仓库镜像后,接下来将hadoop相关jar包依赖加入到项目中。

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.3.0</version>
</dependency>

hdfs api读写

hdfs的java api调用相当简单,基本上和读写本地文件一样,唯一的区别是hdfs的文件不能随机写,只能新增或向后添加。下面是一些测试例子,大家可以参考一下。

package com.gavinzh.learn.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.*;
import java.net.URL;

/**
 * @author jiangmitiao
 */
public class LearnMain {

    /**
     * 判断路径是否存在
     */
    public static boolean exits(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 复制文件到指定路径 若路径已存在,则进行覆盖
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
        throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        // fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖
        fs.copyFromLocalFile(false, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 下载文件到本地 判断本地路径是否已存在,若已存在,则自动进行重命名
     */
    public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        File f = new File(localFilePath);
        // 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) 
        if (f.exists()) {
            System.out.println(localFilePath + " 已存在.");
            int i = 0;
            while (true) {
                f = new File(localFilePath + "_" + i);
                if (!f.exists()) {
                    localFilePath = localFilePath + "_" + i;
                    break;
                }
            }
            System.out.println("将重新命名为: " + localFilePath);
        }

        // 下载文件到本地
        Path localPath = new Path(localFilePath);
        fs.copyToLocalFile(remotePath, localPath);
        fs.close();
    }

    /**
     * 追加文件内容
     */
    public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath)
        throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        // 创建一个文件读入流
        FileInputStream in = new FileInputStream(localFilePath);
        // 创建一个文件输出流,输出的内容将追加到文件末尾
        FSDataOutputStream out = fs.append(remotePath);
        // 读写文件内容
        byte[] data = new byte[1024];
        int read = -1;
        while ((read = in.read(data)) > 0) {
            out.write(data, 0, read);
        }
        out.close();
        in.close();
        fs.close();
    }

    /**
     * 测试使用URL来访问hdfs
     */
    public static void useURLConnectHDFS(Configuration conf, String remoteFilePath)
        throws IOException {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(conf));
        URL url = new URL("hdfs", "192.168.10.103", 8999, remoteFilePath);

        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));

        for (int i = 0; i < 8; i++) {
            System.out.println(reader.readLine());
        }
    }

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        // hadoop伪分布式搭建所在IP
        conf.set("fs.default.name", "hdfs://192.168.10.103:8999");
        // 本地路径
        String localFilePath = "~//Downloads/to_kill_a_mockingbird_from_hdfs.txt";   
        // HDFS路径 
        String remoteFilePath = "/test/to_kill_a_mockingbird.txt";    

        boolean exist = exits(conf, remoteFilePath);

        if (exist) {
            // 如果存在,就尝试读取远端文件
            useURLConnectHDFS(conf, remoteFilePath);
        } else {
            // 如果不存在,就上传本地文件
            copyFromLocalFile(conf, localFilePath, remoteFilePath);
        }

    }
}

标签: java, hadoop, hdfs

添加新评论