0%

spark杂谈-指定文件在HDFS中的写入节点

项目中需要指定文件写到HDFS的具体哪个节点,通过查找API,找到一种解决办法。以 TextOutputFormat 为例测试

write

观察 TextOutputFormat 里的 write 方法,它使用了out.write将 record 写入 HDFS。找到 out ,修改它

public synchronized void write(K key, V value)
throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}

getRecordWriter

out 是一个DataOutputStream,它通过 getRecordWriter得到的。

默认是由 FSDataOutputStream fileOut = fs.create(file, progress; 生成,但它无法指定节点。

作出修改:

关键是把它替换成 DistributedFileSystem 里的 create 方法,其中有个参数是 favoredNodes ,顾名思义数据将优先存到它指定节点

override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[K, V] = {

val keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t")

val file = FileOutputFormat.getTaskOutputPath(job, name)

// 将 OutputStream 转成 DistributedFileSystem
val fs = file.getFileSystem(job).asInstanceOf[DistributedFileSystem]

val favoredNodes = new Array[InetSocketAddress](3)

// 这里自己指定
favoredNodes(0) = new InetSocketAddress("10.0.0.12", 9866) // 这个port是该节点dataNode的port
favoredNodes(1) = new InetSocketAddress("10.0.0.13", 9866)
favoredNodes(2) = new InetSocketAddress("10.0.0.17", 9866)

val fileOut: FSDataOutputStream = fs.create(file,
FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(fs.getConf)),
true,
fs.getConf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
fs.getDefaultReplication(file),
fs.getDefaultBlockSize(file),
null,
favoredNodes)

new TextOutputFormat.LineRecordWriter[K, V](fileOut, keyValueSeparator)
}

完整测试代码

package org.apache.spark.examples.zxyTest

import java.net.InetSocketAddress

import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{IO_FILE_BUFFER_SIZE_DEFAULT, IO_FILE_BUFFER_SIZE_KEY}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, RecordWriter, TextOutputFormat}
import org.apache.hadoop.util.Progressable
import org.apache.spark.sql.SparkSession

/**
* 测试 指定 HDFS 存储节点
*/

object FavoredNodesTest {
def main(args: Array[String]): Unit = {

val spark = SparkSession
.builder
.appName("FavoredNodesTest")
.master("local[*]")
.config("spark.driver.memory", "2g")
.getOrCreate()

val sc = spark.sparkContext

val rdd = sc.makeRDD(Array(("A", 1), ("B", 2), ("C", 3), ("D", 4)), 2)

rdd.saveAsHadoopFile("hdfs://dell-r720/zxyTest/output",
classOf[Text],
classOf[IntWritable],
classOf[MyTextOutputFormat[Text, IntWritable]])

println("Result has been saved")
spark.stop()
}
}

class MyTextOutputFormat[K, V] extends TextOutputFormat[K, V] {
override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[K, V] = {

val keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t")

val file = FileOutputFormat.getTaskOutputPath(job, name)
val fs = file.getFileSystem(job).asInstanceOf[DistributedFileSystem]

// 3 副本位置
val favoredNodes = new Array[InetSocketAddress](3)
favoredNodes(0) = new InetSocketAddress("10.0.0.12", 9866) // 9866是该节点dataNode的port
favoredNodes(1) = new InetSocketAddress("10.0.0.13", 9866)
favoredNodes(2) = new InetSocketAddress("10.0.0.17", 9866)

val fileOut: FSDataOutputStream = fs.create(file,
FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(fs.getConf)),
true,
fs.getConf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
fs.getDefaultReplication(file),
fs.getDefaultBlockSize(file),
null,
favoredNodes)

new TextOutputFormat.LineRecordWriter[K, V](fileOut, keyValueSeparator)
}
}