{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "1619d229-6f5c-4a31-9992-81bce15f7ef1", "metadata": {}, "source": [ "# Chapter 4: Bug Busting - Debugging PySpark\n", "\n", "PySpark executes applications in a distributed environment, making it challenging to\n", "monitor and debug these applications. It can be difficult to track which nodes are\n", "executing specific code. However, there are multiple methods available within PySpark\n", "to help with debugging. This section will outline how to effectively debug PySpark\n", "applications.\n", "\n", "PySpark operates using Spark as its underlying engine, utilizing Spark Connect server\n", "or Py4J (Spark Classic) to submit and compute jobs in Spark.\n", "\n", "On the driver side, PySpark interacts with the Spark Driver on JVM through Spark\n", "Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and\n", "initialized, PySpark starts to communicate with the Spark Driver.\n", "\n", "On the executor side, Python workers are responsible for executing and managing Python\n", "native functions or data. These workers are only launched if the PySpark application\n", "requires interaction between Python and JVMs such as Python UDF execution. They are\n", "initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs." ] }, { "attachments": {}, "cell_type": "markdown", "id": "56890562-0151-45ac-903e-45b4f1d40d33", "metadata": {}, "source": [ "## Spark UI\n", "\n", "### Python UDF Execution\n", "\n", "Debugging a Python UDF in PySpark can be done by simply adding print statements, though\n", "the output won't be visible in the client/driver side since the functions are executed\n", "on the executors - they can be seen in Spark UI. For example, if you have a working\n", "Python UDF:" ] }, { "cell_type": "code", "execution_count": 1, "id": "a9219c08-df6c-40d7-a73d-a5950ee7df0b", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import udf\n", "\n", "@udf(\"integer\")\n", "def my_udf(x):\n", " # Do something with x\n", " return x" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4875da48-03ee-4155-9257-b5514270d591", "metadata": {}, "source": [ "You can add print statements for debugging as shown below:" ] }, { "cell_type": "code", "execution_count": 2, "id": "2b9102be-e7df-4b80-a70e-87ef7e76a913", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(my_udf(id)=0)]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@udf(\"integer\")\n", "def my_udf(x):\n", " # Do something with x\n", " print(\"What's going on?\")\n", " return x\n", "\n", "spark.range(1).select(my_udf(\"id\")).collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "899b5d67-078a-4a12-8da8-7a841432ace2", "metadata": {}, "source": [ "The output can be viewed in the Spark UI under `stdout`/`stderr` at `Executors` tab.\n", "\n", "![Spark UI print](./assets/pyspark-ui-print.png)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "154262f2-6f22-483b-9676-75901b325c66", "metadata": {}, "source": [ "### Non-Python UDF\n", "\n", "When running non-Python UDF code, debugging is typically done via the Spark UI or\n", "by using `DataFrame.explain(True)`.\n", "\n", "For instance, the code below performs a join between a large DataFrame (`df1`) and a\n", "smaller one (`df2`):" ] }, { "cell_type": "code", "execution_count": 3, "id": "55070bab-5659-4bb1-98ff-1a3eb6231218", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "AdaptiveSparkPlan isFinalPlan=false\n", "+- Project [_1#6L]\n", " +- SortMergeJoin [_1#6L], [_1#8L], Inner\n", " :- Sort [_1#6L ASC NULLS FIRST], false, 0\n", " : +- Exchange hashpartitioning(_1#6L, 200), ENSURE_REQUIREMENTS, [plan_id=41]\n", " : +- Filter isnotnull(_1#6L)\n", " : +- Scan ExistingRDD[_1#6L]\n", " +- Sort [_1#8L ASC NULLS FIRST], false, 0\n", " +- Exchange hashpartitioning(_1#8L, 200), ENSURE_REQUIREMENTS, [plan_id=42]\n", " +- Filter isnotnull(_1#8L)\n", " +- Scan ExistingRDD[_1#8L]\n", "\n", "\n" ] } ], "source": [ "df1 = spark.createDataFrame([(x,) for x in range(100)])\n", "df2 = spark.createDataFrame([(x,) for x in range(2)])\n", "df1.join(df2, \"_1\").explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b8b7a595-006c-4417-9683-bc002d85b789", "metadata": {}, "source": [ "Using `DataFrame.explain` displays the physical plans, showing how the join will\n", "be executed. Those physical plans represent individual steps for the whole execution.\n", "Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join." ] }, { "attachments": {}, "cell_type": "markdown", "id": "97fc5dcb-8531-4618-a66f-d84ca5095fdd", "metadata": {}, "source": [ "\n", "After checking how the plans are generated via this method, users can optimize their queries.\n", "For example, because `df2` is very small, it can be broadcasted to executors\n", "and remove the shuffle\n" ] }, { "cell_type": "code", "execution_count": 4, "id": "7a1a985e-d260-49ce-a054-e70e3ed7e9e9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "AdaptiveSparkPlan isFinalPlan=false\n", "+- Project [_1#6L]\n", " +- BroadcastHashJoin [_1#6L], [_1#8L], Inner, BuildRight, false\n", " :- Filter isnotnull(_1#6L)\n", " : +- Scan ExistingRDD[_1#6L]\n", " +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=71]\n", " +- Filter isnotnull(_1#8L)\n", " +- Scan ExistingRDD[_1#8L]\n", "\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import broadcast\n", "\n", "df1.join(broadcast(df2), \"_1\").explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "510949c3-91d1-475f-948f-34eed8617a41", "metadata": {}, "source": [ "As can be seen the shuffle is removed, and it performs broadcast-hash-join:" ] }, { "attachments": {}, "cell_type": "markdown", "id": "028aab52-adea-4b5d-806a-de79e9c54e71", "metadata": {}, "source": [ "\n", "These optimizations can also be visualized in the Spark UI under the SQL / DataFrame\n", "tab after execution.\n" ] }, { "cell_type": "code", "execution_count": 5, "id": "5cc67309-cd0b-49dc-b8d9-8d2c3a5aa944", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(_1=0), Row(_1=1)]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.join(df2, \"_1\").collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "97a09efc-8fcd-400c-a052-8aef3bf7ce15", "metadata": {}, "source": [ "\n", "![PySpark UI SQL](./assets/pyspark-ui-sql.png)\n" ] }, { "cell_type": "code", "execution_count": 6, "id": "25b0f45f-26b6-485d-88df-a1fb323fd3f4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(_1=0), Row(_1=1)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.join(broadcast(df2), \"_1\").collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "657d75db-1a64-4a97-8019-4ad1dd974997", "metadata": {}, "source": [ "![PySpark UI SQL broadcast](./assets/pyspark-ui-sql-broadcast.png)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cf1ab4f6-fe61-409a-9da5-afabdd7c987e", "metadata": {}, "source": [ "## Monitor with `top` and `ps`\n", "\n", "On the driver side, you can obtain the process ID from your PySpark shell to\n", "monitor resources:" ] }, { "cell_type": "code", "execution_count": 7, "id": "f37891f2-1bfc-4995-ba8e-9ac351935bc8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "23976" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os; os.getpid()" ] }, { "cell_type": "code", "execution_count": 8, "id": "cef297f1-3772-40d9-85d9-bdd0c9761c38", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " UID PID PPID C STIME TTY TIME CMD\n", " 502 23976 21512 0 12:06PM ?? 0:02.30 /opt/miniconda3/envs/python3.11/bin/python -m ipykernel_launcher -f /Users/hyukjin.kwon/Library/Jupyter/runtime/kernel-c8eb73ef-2b21-418e-b770-92b946454606.json\n" ] } ], "source": [ "%%bash\n", "ps -fe 23976" ] }, { "attachments": {}, "cell_type": "markdown", "id": "296a6fbb-7ca8-448b-82f4-e7ee6d4359e2", "metadata": {}, "source": [ "On the executor side, you can use `grep` to find the process IDs and resources for\n", "Python workers, as these are forked from `pyspark.daemon`." ] }, { "cell_type": "code", "execution_count": 9, "id": "15b6127c-67ef-4b6f-b4be-7c05af5d12bb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 502 23989 23981 0 12:06PM ?? 0:00.59 python3 -m pyspark.daemon pyspark.worker\n", " 502 23990 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23991 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23992 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23993 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n" ] } ], "source": [ "%%bash\n", "ps -fe | grep pyspark.daemon | head -n 5" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d4088643-1903-4d24-b4b6-cce097a92124", "metadata": {}, "source": [ "Typically, users leverage top and the identified PIDs to monitor the memory usage\n", "of Python processes in PySpark." ] }, { "attachments": {}, "cell_type": "markdown", "id": "2949bb68-0570-44b9-af19-d0b3c260dc49", "metadata": {}, "source": [ "## Use PySpark Profilers\n", "\n", "### Memory Profiler\n", "\n", "In order to debug the driver side, users typically can use most of the existing\n", "Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n", "that allow you to check the memory usage line by line. If your driver program\n", "is not running on another machine (e.g., YARN cluster mode), you can use a memory\n", "profiler to debug memory usage on the driver side. For example:" ] }, { "cell_type": "code", "execution_count": 10, "id": "cee1ae3c-0abe-4e38-b904-7f0c803441a5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Filename: profile_memory.py\n", "\n", "Line # Mem usage Increment Occurrences Line Contents\n", "=============================================================\n", " 4 80.6 MiB 80.6 MiB 1 @profile\n", " 5 #=====================================================\n", " 6 def my_func():\n", " 7 79.0 MiB -1.7 MiB 1 session = SparkSession.builder.getOrCreate()\n", " 8 80.1 MiB 1.1 MiB 1 df = session.range(10000)\n", " 9 84.1 MiB 4.0 MiB 1 return df.collect()\n", "\n", "\n" ] } ], "source": [ "%%bash\n", "\n", "echo \"from pyspark.sql import SparkSession\n", "#===Your function should be decorated with @profile===\n", "from memory_profiler import profile\n", "@profile\n", "#=====================================================\n", "def my_func():\n", " session = SparkSession.builder.getOrCreate()\n", " df = session.range(10000)\n", " return df.collect()\n", "if __name__ == '__main__':\n", " my_func()\" > profile_memory.py\n", "\n", "python -m memory_profiler profile_memory.py 2> /dev/null" ] }, { "attachments": {}, "cell_type": "markdown", "id": "98340ba5-abe0-4f92-ae48-26b63c6f5811", "metadata": {}, "source": [ "It shows which line consumes how much memory properly." ] }, { "attachments": {}, "cell_type": "markdown", "id": "7f01c836", "metadata": {}, "source": [ "#### Python and Pandas UDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cc3e1ccc", "metadata": {}, "source": [ "
\n", "Note: This section applies to Spark 4.0\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0b2926cf-df02-42c8-98fa-5822849e901f", "metadata": {}, "source": [ "PySpark provides remote [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n", "for Python/Pandas UDFs. That can be used on editors with line numbers such as\n", "Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting\n", "the runtime SQL configuration `spark.sql.pyspark.udf.profiler` to `memory`:" ] }, { "cell_type": "code", "execution_count": 11, "id": "553d9780-b30b-4e96-a134-ca1c06341c89", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "============================================================\n", "Profile of UDF\n", "============================================================\n", "Filename: /var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/885006762.py\n", "\n", "Line # Mem usage Increment Occurrences Line Contents\n", "=============================================================\n", " 5 1472.6 MiB 1472.6 MiB 10 @pandas_udf(\"long\")\n", " 6 def add1(x):\n", " 7 1473.9 MiB 1.3 MiB 10 return x + 1\n", "\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import pandas_udf\n", "\n", "df = spark.range(10)\n", "\n", "@pandas_udf(\"long\")\n", "def add1(x):\n", " return x + 1\n", "\n", "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"memory\")\n", "\n", "added = df.select(add1(\"id\"))\n", "spark.profile.clear()\n", "added.collect()\n", "spark.profile.show(type=\"memory\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "abaf0439-43de-4482-8a80-258be3d98366", "metadata": {}, "source": [ "The UDF IDs can be seen in the query plan, for example, `add1(...)#16L` in\n", "`ArrowEvalPython` as shown below." ] }, { "cell_type": "code", "execution_count": 12, "id": "607f9f0c-6288-4bd2-99b8-2cc7e62098ff", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "*(2) Project [pythonUDF0#19L AS add1(id)#17L]\n", "+- ArrowEvalPython [add1(id#14L)#16L], [pythonUDF0#19L], 200\n", " +- *(1) Range (0, 10, step=1, splits=16)\n", "\n", "\n" ] } ], "source": [ "added.explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "06e40cdc", "metadata": {}, "source": [ "### Performance Profiler" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7909aba0", "metadata": {}, "source": [ "
\n", "Note: This section applies to Spark 4.0\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e66c991c-1b07-45d0-aae2-c79c362e2210", "metadata": {}, "source": [ "[Python Profilers](https://docs.python.org/3/library/profile.html) are useful built-in\n", "features in Python itself. To use this on driver side, you can use it as you would\n", "do for regular Python programs because PySpark on driver side is a regular Python\n", "process unless you are running your driver program in another machine\n", "(e.g., YARN cluster mode)." ] }, { "cell_type": "code", "execution_count": 13, "id": "8d9ada24-81da-4c31-aaa2-b3578953b07b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 549275 function calls (536745 primitive calls) in 3.447 seconds\n", "\n", " Ordered by: cumulative time\n", "\n", " ncalls tottime percall cumtime percall filename:lineno(function)\n", " 2 0.000 0.000 3.448 1.724 app.py:1()\n", " 792/1 0.005 0.000 3.447 3.447 {built-in method builtins.exec}\n", " 128 0.000 0.000 2.104 0.016 socket.py:692(readinto)\n", " 128 2.104 0.016 2.104 0.016 {method 'recv_into' of '_socket.socket' objects}\n", " 124 0.000 0.000 2.100 0.017 java_gateway.py:1015(send_command)\n", " 125 0.001 0.000 2.099 0.017 clientserver.py:499(send_command)\n", " 138 0.000 0.000 2.097 0.015 {method 'readline' of '_io.BufferedReader' objects}\n", " 55 0.000 0.000 1.622 0.029 java_gateway.py:1313(__call__)\n", " 95 0.001 0.000 1.360 0.014 __init__.py:1()\n", " 1 0.000 0.000 1.359 1.359 session.py:438(getOrCreate)\n", " 1 0.000 0.000 1.311 1.311 context.py:491(getOrCreate)\n", " 1 0.000 0.000 1.311 1.311 context.py:169(__init__)\n", " 1 0.000 0.000 0.861 0.861 context.py:424(_ensure_initialized)\n", " 1 0.001 0.001 0.861 0.861 java_gateway.py:39(launch_gateway)\n", " 8 0.840 0.105 0.840 0.105 {built-in method time.sleep}\n" ] } ], "source": [ "%%bash\n", "\n", "echo \"from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.getOrCreate()\n", "spark.range(10).collect()\" > app.py\n", "\n", "python -m cProfile -s cumulative app.py 2> /dev/null | head -n 20" ] }, { "attachments": {}, "cell_type": "markdown", "id": "3e5ae42d", "metadata": {}, "source": [ "#### Python/Pandas UDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eb137611", "metadata": {}, "source": [ "
\n", "Note: This section applies to Spark 4.0\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c9b9a26c-56fa-4a14-adfd-2697a87c479e", "metadata": {}, "source": [ "PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with\n", "iterators as inputs/outputs are not supported. SparkSession-based performance\n", "profiler can be enabled by setting the runtime SQL configuration\n", "`spark.sql.pyspark.udf.profiler` to `perf`. An example is as shown below." ] }, { "cell_type": "code", "execution_count": 14, "id": "fcba873d-7a4f-42f9-b796-3c492c7e8077", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "============================================================\n", "Profile of UDF\n", "============================================================\n", " 2130 function calls (2080 primitive calls) in 0.003 seconds\n", "\n", " Ordered by: internal time, cumulative time\n", "\n", " ncalls tottime percall cumtime percall filename:lineno(function)\n", " 10 0.001 0.000 0.003 0.000 common.py:62(new_method)\n", " 10 0.000 0.000 0.000 0.000 {built-in method _operator.add}\n", " 10 0.000 0.000 0.002 0.000 base.py:1371(_arith_method)\n", " 10 0.000 0.000 0.001 0.000 series.py:389(__init__)\n", " 20 0.000 0.000 0.000 0.000 _ufunc_config.py:33(seterr)\n", " 10 0.000 0.000 0.001 0.000 series.py:6201(_construct_result)\n", " 10 0.000 0.000 0.000 0.000 cast.py:1605(maybe_cast_to_integer_array)\n", " 10 0.000 0.000 0.000 0.000 construction.py:517(sanitize_array)\n", " 10 0.000 0.000 0.002 0.000 series.py:6133(_arith_method)\n", " 10 0.000 0.000 0.000 0.000 managers.py:1863(from_array)\n", " 10 0.000 0.000 0.000 0.000 array_ops.py:240(arithmetic_op)\n", " 510 0.000 0.000 0.000 0.000 {built-in method builtins.isinstance}\n" ] } ], "source": [ "import io\n", "from contextlib import redirect_stdout\n", "\n", "from pyspark.sql.functions import pandas_udf\n", "\n", "df = spark.range(10)\n", "@pandas_udf(\"long\")\n", "def add1(x):\n", " return x + 1\n", "\n", "added = df.select(add1(\"id\"))\n", "\n", "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"perf\")\n", "spark.profile.clear()\n", "added.collect()\n", "\n", "# Only show top 10 lines\n", "output = io.StringIO()\n", "with redirect_stdout(output):\n", " spark.profile.show(type=\"perf\")\n", "\n", "print(\"\\n\".join(output.getvalue().split(\"\\n\")[0:20]))" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b35fb6c4-c074-4866-abd6-b1b435721b67", "metadata": {}, "source": [ "The UDF IDs can be seen in the query plan, for example, `add1(...)#22L` in\n", "`ArrowEvalPython` below." ] }, { "cell_type": "code", "execution_count": 15, "id": "de015526-577b-45ea-a6e5-598cf215ef8b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "*(2) Project [pythonUDF0#25L AS add1(id)#23L]\n", "+- ArrowEvalPython [add1(id#20L)#22L], [pythonUDF0#25L], 200\n", " +- *(1) Range (0, 10, step=1, splits=16)\n", "\n", "\n" ] } ], "source": [ "added.explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fe046e5e-73bb-466f-a373-9d5a445b0fa1", "metadata": {}, "source": [ "We can render the result with a preregistered renderer as shown below." ] }, { "cell_type": "code", "execution_count": 16, "id": "e2507dc0-ab64-4afe-ae8a-19c52533e57c", "metadata": {}, "outputs": [], "source": [ "spark.profile.render(id=2, type=\"perf\") # renderer=\"flameprof\" by default" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ba892df0-4058-46ad-a952-791559da5259", "metadata": {}, "source": [ "![PySpark UDF profiling](./assets/pyspark-udf-profile.png)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "09b420ba", "metadata": {}, "source": [ "## Disply Stacktraces" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9756e41b", "metadata": {}, "source": [ "
\n", "Note: This section applies to Spark 4.0\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c7bf0b21-f9a8-4cc0-8288-46f7ef4f4f52", "metadata": {}, "source": [ "By default, JVM stacktraces and Python internal tracebacks are hidden especially\n", "in Python UDF executions. For example," ] }, { "cell_type": "code", "execution_count": 17, "id": "bafce04e-5d7c-40e0-9342-0ffb94c858c7", "metadata": {}, "outputs": [ { "ename": "PythonException", "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in \nZeroDivisionError: division by zero\n", "output_type": "error", "traceback": [ "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in \nZeroDivisionError: division by zero\n" ] } ], "source": [ "from pyspark.sql.functions import udf\n", "\n", "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "78100070-b9db-4efd-ad94-a8e6a00ee68a", "metadata": {}, "source": [ "\n", "To show the whole internal stacktraces, users can enable\n", "`spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled` and `spark.sql.pyspark.jvmStacktrace.enabled`\n", "respectively.\n" ] }, { "cell_type": "code", "execution_count": 18, "id": "425b24b8-acd0-4897-bc8c-75af6316f430", "metadata": {}, "outputs": [ { "ename": "PythonException", "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n process()\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n serializer.dump_stream(out_iter, outfile)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n self.serializer.dump_stream(self._batched(iterator), stream)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n for obj in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n for item in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in \n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in \n return args_kwargs_offsets, lambda *a: func(*a)\n ^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n return f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n ret = f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in \nZeroDivisionError: division by zero\n", "output_type": "error", "traceback": [ "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n process()\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n serializer.dump_stream(out_iter, outfile)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n self.serializer.dump_stream(self._batched(iterator), stream)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n for obj in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n for item in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in \n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in \n return args_kwargs_offsets, lambda *a: func(*a)\n ^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n return f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n ret = f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in \nZeroDivisionError: division by zero\n" ] } ], "source": [ "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", False)\n", "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", False)\n", "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" ] }, { "cell_type": "code", "execution_count": 19, "id": "5ec62d86-3631-4f48-a10c-4bcd727b1eb6", "metadata": {}, "outputs": [ { "ename": "PythonException", "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in \nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in \nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2887)\n\tat scala.Option.getOrElse(Option.scala:201)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2887)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2879)\n\tat scala.collection.immutable.List.foreach(List.scala:334)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2879)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1283)\n\tat scala.Option.foreach(Option.scala:437)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3158)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3092)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3081)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:417)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1056)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:155)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:222)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in \nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t... 1 more\n", "output_type": "error", "traceback": [ "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in \nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in \nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\t...\n" ] } ], "source": [ "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", True)\n", "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", True)\n", "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "59bab886-7a57-4736-89b1-e3776b3b991e", "metadata": {}, "source": [ "See also [Stack Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces) for more details." ] }, { "attachments": {}, "cell_type": "markdown", "id": "0611287d-cb34-457e-9bc3-f5629ddea484", "metadata": {}, "source": [ "\n", "## IDE Debugging" ] }, { "attachments": {}, "cell_type": "markdown", "id": "80f080c4", "metadata": {}, "source": [ "On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below:\n", "\n", "- [Setting up IDEs](https://spark.apache.org/docs/latest/api/python/development/setting_ide.html)\n", "\n", "On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below:\n", "\n", "- [Remote Debugging (PyCharm Professional)](https://spark.apache.org/docs/latest/api/python/development/debugging.html#remote-debugging-pycharm-professional)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }