pyspark.sql.functions.udtf#

pyspark.sql.functions.udtf(cls=None, *, returnType=None, useArrow=None)[source]#

Creates a user defined table function (UDTF).

New in version 3.5.0.

Changed in version 4.0.0: Supports Python side analysis.

Changed in version 4.0.0: Supports keyword-arguments.

Parameters
clsclass, optional

the Python user-defined table function handler class.

returnTypepyspark.sql.types.StructType or str, optional

the return type of the user-defined table function. The value can be either a pyspark.sql.types.StructType object or a DDL-formatted struct type string. If None, the handler class must provide analyze static method.

useArrowbool, optional

whether to use Arrow to optimize the (de)serializations. When it’s set to None, the Spark config “spark.sql.execution.pythonUDTF.arrow.enabled” is used.

Notes

User-defined table functions (UDTFs) are considered non-deterministic by default. Use asDeterministic() to mark a function as deterministic. E.g.:

>>> class PlusOne:
...     def eval(self, a: int):
...         yield a + 1,
>>> plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()

Use “yield” to produce one row for the UDTF result relation as many times as needed. In the context of a lateral join, each such result row will be associated with the most recent input row consumed from the “eval” method.

User-defined table functions are considered opaque to the optimizer by default. As a result, operations like filters from WHERE clauses or limits from LIMIT/OFFSET clauses that appear after the UDTF call will execute on the UDTF’s result relation. By the same token, any relations forwarded as input to UDTFs will plan as full table scans in the absence of any explicit such filtering or other logic explicitly written in a table subquery surrounding the provided input relation.

User-defined table functions do not accept keyword arguments on the calling side.

Examples

Implement the UDTF class and create a UDTF:

>>> class TestUDTF:
...     def eval(self, *args: Any):
...         yield "hello", "world"
...
>>> from pyspark.sql.functions import udtf
>>> test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
>>> test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

UDTF can also be created using the decorator syntax:

>>> @udtf(returnType="c1: int, c2: int")
... class PlusOne:
...     def eval(self, x: int):
...         yield x, x + 1
...
>>> from pyspark.sql.functions import lit
>>> PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

UDTF can also have analyze static method instead of a static return type:

The analyze static method should take arguments:

  • The number and order of arguments are the same as the UDTF inputs

  • Each argument is a pyspark.sql.udtf.AnalyzeArgument, containing: - dataType: DataType - value: Any: the calculated value if the argument is foldable; otherwise None - isTable: bool: True if the argument is a table argument

and return a pyspark.sql.udtf.AnalyzeResult, containing.

  • schema: StructType

>>> from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
>>> # or from pyspark.sql.functions import AnalyzeArgument, AnalyzeResult
>>> @udtf
... class TestUDTFWithAnalyze:
...     @staticmethod
...     def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
...         return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
...
...     def eval(self, a, b):
...         yield a, b
...
>>> TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

UDTF can use keyword arguments:

>>> @udtf
... class TestUDTFWithKwargs:
...     @staticmethod
...     def analyze(
...         a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
...     ) -> AnalyzeResult:
...         return AnalyzeResult(
...             StructType().add("a", a.dataType)
...                 .add("b", b.dataType)
...                 .add("x", kwargs["x"].dataType)
...         )
...
...     def eval(self, a, b, **kwargs):
...         yield a, b, kwargs["x"]
...
>>> TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+
>>> _ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
>>> spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Arrow optimization can be explicitly enabled when creating UDTFs:

>>> @udtf(returnType="c1: int, c2: int", useArrow=True)
... class ArrowPlusOne:
...     def eval(self, x: int):
...         yield x, x + 1
...
>>> ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+