0%

spark调试-常用代码

一些我用到的调试代码

统计每个分区的record个数

List<Tuple2<Integer, Integer>> numOfRecordPerPartition = javaRDD.mapPartitionsWithIndex(
((Function2<Integer, Iterator<T>, Iterator<Tuple2<Integer, Integer>>>) (Index, iterator) -> {
List<Tuple2<Integer, Integer>> numOfRecord = new ArrayList<>();
int totalElement = 0;
while (iterator.hasNext()) {
iterator.next();
totalElement++;
}
numOfRecord.add(new Tuple2<>(Index, totalElement));
return numOfRecord.iterator();
}), true).collect();

收集每个分区的第一个record

List<Tuple2<Integer, T>> firstRecordPerPartition = javaRDD.mapPartitionsWithIndex(
((Function2<Integer, Iterator<T>, Iterator<Tuple2<Integer, T>>>) (Index, iterator) -> {
List<Tuple2<Integer, T>> record = new ArrayList<>();
if (iterator.hasNext())
record.add(new Tuple2<>(Index, iterator.next()));
else
record.add(null);
return record.iterator();
}), true).collect();

查看当前RDD的PreferredLocations

Partition[] partitions = javaRDD.rdd().getPartitions();
List<Seq<String>> preferredLocationsList = new ArrayList<>();
for (Partition partition : partitions) {
preferredLocationsList.add(javaRDD.rdd().getPreferredLocations(partition));
}