pyspark.sql.streaming.DataStreamWriter.start

DataStreamWriter.start(path: Optional[str] = None, format: Optional[str] = None, outputMode: Optional[str] = None, partitionBy: Union[str, List[str], None] = None, queryName: Optional[str] = None, **options: OptionalPrimitiveType) → pyspark.sql.streaming.query.StreamingQuery[source]

Streams the contents of the DataFrame to a data source.

The data source is specified by the format and a set of options. If format is not specified, the default data source configured by spark.sql.sources.default will be used.

New in version 2.0.0.

Parameters
pathstr, optional

the path in a Hadoop supported file system

formatstr, optional

the format used to save

outputModestr, optional

specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

  • append: Only the new rows in the streaming DataFrame/Dataset will be written to the sink

  • complete: All the rows in the streaming DataFrame/Dataset will be written to the sink every time these are some updates

  • update: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn’t contain aggregations, it will be equivalent to append mode.

partitionBystr or list, optional

names of partitioning columns

queryNamestr, optional

unique name for the query

**optionsdict

All other string options. You may want to provide a checkpointLocation for most streams, however it is not required for a memory stream.

Notes

This API is evolving.

Examples

>>> df = spark.readStream.format("rate").load()

Basic example.

>>> q = df.writeStream.format('memory').queryName('this_query').start()
>>> q.isActive
True
>>> q.name
'this_query'
>>> q.stop()
>>> q.isActive
False

Example with using other parameters with a trigger.

>>> q = df.writeStream.trigger(processingTime='5 seconds').start(
...     queryName='that_query', outputMode="append", format='memory')
>>> q.name
'that_query'
>>> q.isActive
True
>>> q.stop()