{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "1619d229-6f5c-4a31-9992-81bce15f7ef1", "metadata": {}, "source": [ "# Chapter 4: Bug Busting - Debugging PySpark\n", "\n", "PySpark executes applications in a distributed environment, making it challenging to\n", "monitor and debug these applications. It can be difficult to track which nodes are\n", "executing specific code. However, there are multiple methods available within PySpark\n", "to help with debugging. This section will outline how to effectively debug PySpark\n", "applications.\n", "\n", "PySpark operates using Spark as its underlying engine, utilizing Spark Connect server\n", "or Py4J (Spark Classic) to submit and compute jobs in Spark.\n", "\n", "On the driver side, PySpark interacts with the Spark Driver on JVM through Spark\n", "Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and\n", "initialized, PySpark starts to communicate with the Spark Driver.\n", "\n", "On the executor side, Python workers are responsible for executing and managing Python\n", "native functions or data. These workers are only launched if the PySpark application\n", "requires interaction between Python and JVMs such as Python UDF execution. They are\n", "initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs." ] }, { "attachments": {}, "cell_type": "markdown", "id": "56890562-0151-45ac-903e-45b4f1d40d33", "metadata": {}, "source": [ "## Spark UI\n", "\n", "### Python UDF Execution\n", "\n", "Debugging a Python UDF in PySpark can be done by simply adding print statements, though\n", "the output won't be visible in the client/driver side since the functions are executed\n", "on the executors - they can be seen in Spark UI. For example, if you have a working\n", "Python UDF:" ] }, { "cell_type": "code", "execution_count": 1, "id": "a9219c08-df6c-40d7-a73d-a5950ee7df0b", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import udf\n", "\n", "@udf(\"integer\")\n", "def my_udf(x):\n", " # Do something with x\n", " return x" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4875da48-03ee-4155-9257-b5514270d591", "metadata": {}, "source": [ "You can add print statements for debugging as shown below:" ] }, { "cell_type": "code", "execution_count": 2, "id": "2b9102be-e7df-4b80-a70e-87ef7e76a913", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(my_udf(id)=0)]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@udf(\"integer\")\n", "def my_udf(x):\n", " # Do something with x\n", " print(\"What's going on?\")\n", " return x\n", "\n", "spark.range(1).select(my_udf(\"id\")).collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "899b5d67-078a-4a12-8da8-7a841432ace2", "metadata": {}, "source": [ "The output can be viewed in the Spark UI under `stdout`/`stderr` at `Executors` tab.\n", "\n", "" ] }, { "attachments": {}, "cell_type": "markdown", "id": "154262f2-6f22-483b-9676-75901b325c66", "metadata": {}, "source": [ "### Non-Python UDF\n", "\n", "When running non-Python UDF code, debugging is typically done via the Spark UI or\n", "by using `DataFrame.explain(True)`.\n", "\n", "For instance, the code below performs a join between a large DataFrame (`df1`) and a\n", "smaller one (`df2`):" ] }, { "cell_type": "code", "execution_count": 3, "id": "55070bab-5659-4bb1-98ff-1a3eb6231218", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "AdaptiveSparkPlan isFinalPlan=false\n", "+- Project [_1#6L]\n", " +- SortMergeJoin [_1#6L], [_1#8L], Inner\n", " :- Sort [_1#6L ASC NULLS FIRST], false, 0\n", " : +- Exchange hashpartitioning(_1#6L, 200), ENSURE_REQUIREMENTS, [plan_id=41]\n", " : +- Filter isnotnull(_1#6L)\n", " : +- Scan ExistingRDD[_1#6L]\n", " +- Sort [_1#8L ASC NULLS FIRST], false, 0\n", " +- Exchange hashpartitioning(_1#8L, 200), ENSURE_REQUIREMENTS, [plan_id=42]\n", " +- Filter isnotnull(_1#8L)\n", " +- Scan ExistingRDD[_1#8L]\n", "\n", "\n" ] } ], "source": [ "df1 = spark.createDataFrame([(x,) for x in range(100)])\n", "df2 = spark.createDataFrame([(x,) for x in range(2)])\n", "df1.join(df2, \"_1\").explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b8b7a595-006c-4417-9683-bc002d85b789", "metadata": {}, "source": [ "Using `DataFrame.explain` displays the physical plans, showing how the join will\n", "be executed. Those physical plans represent individual steps for the whole execution.\n", "Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join." ] }, { "attachments": {}, "cell_type": "markdown", "id": "97fc5dcb-8531-4618-a66f-d84ca5095fdd", "metadata": {}, "source": [ "\n", "After checking how the plans are generated via this method, users can optimize their queries.\n", "For example, because `df2` is very small, it can be broadcasted to executors\n", "and remove the shuffle\n" ] }, { "cell_type": "code", "execution_count": 4, "id": "7a1a985e-d260-49ce-a054-e70e3ed7e9e9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "AdaptiveSparkPlan isFinalPlan=false\n", "+- Project [_1#6L]\n", " +- BroadcastHashJoin [_1#6L], [_1#8L], Inner, BuildRight, false\n", " :- Filter isnotnull(_1#6L)\n", " : +- Scan ExistingRDD[_1#6L]\n", " +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=71]\n", " +- Filter isnotnull(_1#8L)\n", " +- Scan ExistingRDD[_1#8L]\n", "\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import broadcast\n", "\n", "df1.join(broadcast(df2), \"_1\").explain()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "510949c3-91d1-475f-948f-34eed8617a41", "metadata": {}, "source": [ "As can be seen the shuffle is removed, and it performs broadcast-hash-join:" ] }, { "attachments": {}, "cell_type": "markdown", "id": "028aab52-adea-4b5d-806a-de79e9c54e71", "metadata": {}, "source": [ "\n", "These optimizations can also be visualized in the Spark UI under the SQL / DataFrame\n", "tab after execution.\n" ] }, { "cell_type": "code", "execution_count": 5, "id": "5cc67309-cd0b-49dc-b8d9-8d2c3a5aa944", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(_1=0), Row(_1=1)]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.join(df2, \"_1\").collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "97a09efc-8fcd-400c-a052-8aef3bf7ce15", "metadata": {}, "source": [ "\n", "\n" ] }, { "cell_type": "code", "execution_count": 6, "id": "25b0f45f-26b6-485d-88df-a1fb323fd3f4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(_1=0), Row(_1=1)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.join(broadcast(df2), \"_1\").collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "657d75db-1a64-4a97-8019-4ad1dd974997", "metadata": {}, "source": [ "" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cf1ab4f6-fe61-409a-9da5-afabdd7c987e", "metadata": {}, "source": [ "## Monitor with `top` and `ps`\n", "\n", "On the driver side, you can obtain the process ID from your PySpark shell to\n", "monitor resources:" ] }, { "cell_type": "code", "execution_count": 7, "id": "f37891f2-1bfc-4995-ba8e-9ac351935bc8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "23976" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os; os.getpid()" ] }, { "cell_type": "code", "execution_count": 8, "id": "cef297f1-3772-40d9-85d9-bdd0c9761c38", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " UID PID PPID C STIME TTY TIME CMD\n", " 502 23976 21512 0 12:06PM ?? 0:02.30 /opt/miniconda3/envs/python3.11/bin/python -m ipykernel_launcher -f /Users/hyukjin.kwon/Library/Jupyter/runtime/kernel-c8eb73ef-2b21-418e-b770-92b946454606.json\n" ] } ], "source": [ "%%bash\n", "ps -fe 23976" ] }, { "attachments": {}, "cell_type": "markdown", "id": "296a6fbb-7ca8-448b-82f4-e7ee6d4359e2", "metadata": {}, "source": [ "On the executor side, you can use `grep` to find the process IDs and resources for\n", "Python workers, as these are forked from `pyspark.daemon`." ] }, { "cell_type": "code", "execution_count": 9, "id": "15b6127c-67ef-4b6f-b4be-7c05af5d12bb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 502 23989 23981 0 12:06PM ?? 0:00.59 python3 -m pyspark.daemon pyspark.worker\n", " 502 23990 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23991 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23992 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", " 502 23993 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n" ] } ], "source": [ "%%bash\n", "ps -fe | grep pyspark.daemon | head -n 5" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d4088643-1903-4d24-b4b6-cce097a92124", "metadata": {}, "source": [ "Typically, users leverage top and the identified PIDs to monitor the memory usage\n", "of Python processes in PySpark." ] }, { "attachments": {}, "cell_type": "markdown", "id": "2949bb68-0570-44b9-af19-d0b3c260dc49", "metadata": {}, "source": [ "## Use PySpark Profilers\n", "\n", "### Memory Profiler\n", "\n", "In order to debug the driver side, users typically can use most of the existing\n", "Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n", "that allow you to check the memory usage line by line. If your driver program\n", "is not running on another machine (e.g., YARN cluster mode), you can use a memory\n", "profiler to debug memory usage on the driver side. For example:" ] }, { "cell_type": "code", "execution_count": 10, "id": "cee1ae3c-0abe-4e38-b904-7f0c803441a5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Filename: profile_memory.py\n", "\n", "Line # Mem usage Increment Occurrences Line Contents\n", "=============================================================\n", " 4 80.6 MiB 80.6 MiB 1 @profile\n", " 5 #=====================================================\n", " 6 def my_func():\n", " 7 79.0 MiB -1.7 MiB 1 session = SparkSession.builder.getOrCreate()\n", " 8 80.1 MiB 1.1 MiB 1 df = session.range(10000)\n", " 9 84.1 MiB 4.0 MiB 1 return df.collect()\n", "\n", "\n" ] } ], "source": [ "%%bash\n", "\n", "echo \"from pyspark.sql import SparkSession\n", "#===Your function should be decorated with @profile===\n", "from memory_profiler import profile\n", "@profile\n", "#=====================================================\n", "def my_func():\n", " session = SparkSession.builder.getOrCreate()\n", " df = session.range(10000)\n", " return df.collect()\n", "if __name__ == '__main__':\n", " my_func()\" > profile_memory.py\n", "\n", "python -m memory_profiler profile_memory.py 2> /dev/null" ] }, { "attachments": {}, "cell_type": "markdown", "id": "98340ba5-abe0-4f92-ae48-26b63c6f5811", "metadata": {}, "source": [ "It shows which line consumes how much memory properly." ] }, { "attachments": {}, "cell_type": "markdown", "id": "7f01c836", "metadata": {}, "source": [ "#### Python and Pandas UDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cc3e1ccc", "metadata": {}, "source": [ "