pyspark.sql.streaming.DataStreamWriter.toTable

DataStreamWriter.toTable(tableName: str, format: Optional[str] = None, outputMode: Optional[str] = None, partitionBy: Union[str, List[str], None] = None, queryName: Optional[str] = None, **options: OptionalPrimitiveType) → pyspark.sql.streaming.query.StreamingQuery[source]

Starts the execution of the streaming query, which will continually output results to the given table as new data arrives.

The returned StreamingQuery object can be used to interact with the stream.

New in version 3.1.0.

Parameters
tableNamestr

string, for the name of the table.

formatstr, optional

the format used to save.

outputModestr, optional

specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

  • append: Only the new rows in the streaming DataFrame/Dataset will be written to the sink

  • complete: All the rows in the streaming DataFrame/Dataset will be written to the sink every time these are some updates

  • update: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn’t contain aggregations, it will be equivalent to append mode.

partitionBystr or list, optional

names of partitioning columns

queryNamestr, optional

unique name for the query

**optionsdict

All other string options. You may want to provide a checkpointLocation.

Notes

This API is evolving.

For v1 table, partitioning columns provided by partitionBy will be respected no matter the table exists or not. A new table will be created if the table not exists.

For v2 table, partitionBy will be ignored if the table already exists. partitionBy will be respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks some functionalities (e.g., customized properties, options, and serde info). If you need them, please create the v2 table manually before the execution to avoid creating a table with incomplete information.

Examples

Save a data stream to a table.

>>> import tempfile
>>> import time
>>> _ = spark.sql("DROP TABLE IF EXISTS my_table2")
>>> with tempfile.TemporaryDirectory() as d:
...     # Create a table with Rate source.
...     q = spark.readStream.format("rate").option(
...         "rowsPerSecond", 10).load().writeStream.toTable(
...             "my_table2",
...             queryName='that_query',
...             outputMode="append",
...             format='parquet',
...             checkpointLocation=d)
...     time.sleep(3)
...     q.stop()
...     spark.read.table("my_table2").show()
...     _ = spark.sql("DROP TABLE my_table2")
+...---------+-----+
|...timestamp|value|
+...---------+-----+
...