pyspark.RDD.saveAsHadoopDataset¶
-
RDD.
saveAsHadoopDataset
(conf: Dict[str, str], keyConverter: Optional[str] = None, valueConverter: Optional[str] = None) → None[source]¶ Output a Python RDD of key-value pairs (of form
RDD[(K, V)]
) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Keys/values are converted for output using either user specified converters or, by default, “org.apache.spark.api.python.JavaToWritableConverter”.New in version 1.1.0.
- Parameters
- confdict
Hadoop job configuration
- keyConverterstr, optional
fully qualified classname of key converter (None by default)
- valueConverterstr, optional
fully qualified classname of value converter (None by default)
See also
Examples
>>> import os >>> import tempfile
Set the related classes
>>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat" >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat" >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Create the conf for writing ... write_conf = { ... "mapred.output.format.class": output_format_class, ... "mapreduce.job.output.key.class": key_class, ... "mapreduce.job.output.value.class": value_class, ... "mapreduce.output.fileoutputformat.outputdir": path, ... } ... ... # Write a temporary Hadoop file ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) ... rdd.saveAsHadoopDataset(conf=write_conf) ... ... # Create the conf for reading ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} ... ... # Load this Hadoop file as an RDD ... loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf) ... sorted(loaded.collect()) [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]