pyspark.sql.DataFrame.mapInPandas#

DataFrame.mapInPandas(func, schema, barrier=False, profile=None)[source]#

Maps an iterator of batches in the current DataFrame using a Python native function that is performed on pandas DataFrames both as input and output, and returns the result as a DataFrame.

This method applies the specified Python function to an iterator of pandas.DataFrames, each representing a batch of rows from the original DataFrame. The returned iterator of pandas.DataFrames are combined as a DataFrame. The size of the function’s input and output can be different. Each pandas.DataFrame size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.

New in version 3.0.0.

Changed in version 3.4.0: Supports Spark Connect.

Parameters
funcfunction

a Python native function that takes an iterator of pandas.DataFrames, and outputs an iterator of pandas.DataFrames.

schemapyspark.sql.types.DataType or str

the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

barrierbool, optional, default False

Use barrier mode execution, ensuring that all Python workers in the stage will be launched concurrently.

profilepyspark.resource.ResourceProfile. The optional ResourceProfile

to be used for mapInPandas.

Notes

This API is experimental

Examples

>>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

Filter rows with id equal to 1:

>>> def filter_func(iterator):
...     for pdf in iterator:
...         yield pdf[pdf.id == 1]
...
>>> df.mapInPandas(filter_func, df.schema).show()  
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

Compute the mean age for each id:

>>> def mean_age(iterator):
...     for pdf in iterator:
...         yield pdf.groupby("id").mean().reset_index()
...
>>> df.mapInPandas(mean_age, "id: bigint, age: double").show()  
+---+----+
| id| age|
+---+----+
|  1|21.0|
|  2|30.0|
+---+----+

Add a new column with the double of the age:

>>> def double_age(iterator):
...     for pdf in iterator:
...         pdf["double_age"] = pdf["age"] * 2
...         yield pdf
...
>>> df.mapInPandas(
...     double_age, "id: bigint, age: bigint, double_age: bigint").show()  
+---+---+----------+
| id|age|double_age|
+---+---+----------+
|  1| 21|        42|
|  2| 30|        60|
+---+---+----------+

Set barrier to True to force the mapInPandas stage running in the barrier mode, it ensures all Python workers in the stage will be launched concurrently.

>>> df.mapInPandas(filter_func, df.schema, barrier=True).show()  
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+