package org.apache.spark.examples.zxyTest
import java.net.InetSocketAddress
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{IO_FILE_BUFFER_SIZE_DEFAULT, IO_FILE_BUFFER_SIZE_KEY} import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, RecordWriter, TextOutputFormat} import org.apache.hadoop.util.Progressable import org.apache.spark.sql.SparkSession
object FavoredNodesTest { def main(args: Array[String]): Unit = {
val spark = SparkSession .builder .appName("FavoredNodesTest") .master("local[*]") .config("spark.driver.memory", "2g") .getOrCreate()
val sc = spark.sparkContext
val rdd = sc.makeRDD(Array(("A", 1), ("B", 2), ("C", 3), ("D", 4)), 2)
rdd.saveAsHadoopFile("hdfs://dell-r720/zxyTest/output", classOf[Text], classOf[IntWritable], classOf[MyTextOutputFormat[Text, IntWritable]])
println("Result has been saved") spark.stop() } }
class MyTextOutputFormat[K, V] extends TextOutputFormat[K, V] { override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[K, V] = {
val keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t")
val file = FileOutputFormat.getTaskOutputPath(job, name) val fs = file.getFileSystem(job).asInstanceOf[DistributedFileSystem]
val favoredNodes = new Array[InetSocketAddress](3) favoredNodes(0) = new InetSocketAddress("10.0.0.12", 9866) favoredNodes(1) = new InetSocketAddress("10.0.0.13", 9866) favoredNodes(2) = new InetSocketAddress("10.0.0.17", 9866)
val fileOut: FSDataOutputStream = fs.create(file, FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(fs.getConf)), true, fs.getConf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), fs.getDefaultReplication(file), fs.getDefaultBlockSize(file), null, favoredNodes)
new TextOutputFormat.LineRecordWriter[K, V](fileOut, keyValueSeparator) } }
|