6.11. Better Pandas

This section cover tools to make your experience with Pandas a litte bit better.

6.11.1. tqdm: Add Progress Bar to Your Pandas Apply

!pip install tqdm 

If you want to have a progress bar to get updated about the progress of your pandas apply, try tqdm.

import pandas as pd 
from tqdm import tqdm 
import time 

df = pd.DataFrame({'a': [1, 2, 3, 4, 5], 'b': [2, 3, 4, 5, 6]})

tqdm.pandas()
def func(row):
    time.sleep(1)
    return row + 1

df['a'].progress_apply(func)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:05<00:00,  1.00s/it]
0    2
1    3
2    4
3    5
4    6
Name: a, dtype: int64

Link to tqdm.

6.11.2. fugue: Use pandas Functions on the Spark and Dask Engines.

!pip install fugue pyspark

Wouldn’t it be nice if you can leverage Spark or Dask to parallelize data science workloads using pandas syntax? Fugue allows you to do exactly that.

Fugue provides the transform function allowing users to use pandas functions on the Spark and Dask engines.

import pandas as pd
from typing import Dict
from fugue import transform
from fugue_spark import SparkExecutionEngine

input_df = pd.DataFrame({"id": [0, 1, 2], "fruit": (["apple", "banana", "orange"])})
map_price = {"apple": 2, "banana": 1, "orange": 3}


def map_price_to_fruit(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:
    df["price"] = df["fruit"].map(mapping)
    return df


df = transform(
    input_df,
    map_price_to_fruit,
    schema="*, price:int",
    params=dict(mapping=map_price),
    engine=SparkExecutionEngine,
)
df.show()
21/10/01 11:17:05 WARN Utils: Your hostname, khuyen-Precision-7740 resolves to a loopback address: 127.0.1.1; using 192.168.1.90 instead (on interface wlp111s0)
21/10/01 11:17:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/khuyen/book/venv/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/01 11:17:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/01 11:17:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 2:===============>                                          (3 + 8) / 11]
+---+------+-----+
| id| fruit|price|
+---+------+-----+
|  0| apple|    2|
|  1|banana|    1|
|  2|orange|    3|
+---+------+-----+
[Stage 2:==========================================>               (8 + 3) / 11]

                                                                                

Link to fugue.

6.11.3. FugueSQL: Use SQL to Work with Pandas, Spark, and Dask DataFrames

!pip install fugue 

Do you like to use both Python and SQL to manipulate data? FugueSQL is an interface that allows users to use SQL to work with Pandas, Spark, and Dask DataFrames.

import pandas as pd
from fugue_sql import fsql

input_df = pd.DataFrame({"price": [2, 1, 3], "fruit": (["apple", "banana", "orange"])})

query = """
SELECT price, fruit FROM input_df
WHERE price > 1
PRINT
"""

fsql(query).run()
PandasDataFrame
price:long|fruit:str
----------+---------
2         |apple    
3         |orange   
Total count: 2
DataFrames()

Link to fugue.