We'll work on a dataset `gro.csv` for **credit scoring** that was proposed years ago as a data challenge.

It is a realistic and messy dataset: it contains lots of missing values, several types of features/columns (dates, categories, continuous features). Serious data cleaning and formating is required.

This dataset contains the following columns:

| Column name          | Description |
|:---------------------|:------------|
| BirthDate            | Date of birth of the client |
| Customer_Open_Date   | Creation date of the client's first account at the bankÂ |
| Customer_Type        | Type of client (existing / new) | 
| Educational_Level    | Highest diploma |
| Id_Customer          | Id of the client |
| Marital_Status       | Family situation |
| Nb_Of_Products       | Number of products held by the client |
| Net_Annual_Income    | Annual revenue |
| Number_Of_Dependent  | Number of dependents |
| P_Client             | Non-disclosed feature |
| Prod_Category        | Product category |
| Prod_Closed_Date     | Closing date of the last product |
| Prod_Decision_Date   | Decision date of the last agreement for a financing product |
| Prod_Sub_Category    | Sub-category of the product |
| Source               | Financing source (Branch or Sales) |
| Type_Of_Residence    | Residential situation |
| Y                    | Credit was granted (yes / no) |
| Years_At_Business    | Number of year at the current job position |
| Years_At_Residence   | Number of year at the current housing |

Column `Y` (Credit was granted) is the response variable. Other columns are are explanatory columns/covariates. 

# Your job

Read the `gro.csv` dataset and work on it using  Spark, the `pandas` API on Spark, and the graphical backend associated with this API: `plotly`.

- The column separator in the CSV file is not `,` but `;` so you need to use the `sep` option in `pd.read_csv`
- The categorical columns must be imported as `category` type 
- Something weird is going on with the `Net_Annual_Income` column... Try to  understand what is going on and try to correct the problem
- Several columns are empty, we need to remove them (or not even read them)
- Dates must be imported as dates and not strings
- Remove rows with missing values

Many of these things can be done right from the beginning, when reading the CSV file, through some options to the `pd.read_csv` function. You might need to read carefully its documentation in order to understand some useful options. Once you are happy with your importation and cleaning of the data, you can:
- Use `matplotlib` and `pandas` to perform data visualization...
- ... in order to understand visually the impact of some features on `Y` (credit was granted or not). For this, you need to decide on the plots that make sense for this and produce them

We will provide thorough explanations and code that performs all of this in subsequent sessions.


# A quick and easy (but actually bad) import

Let's import the data into a pandas dataframe, as simply as possible
The only thing we care about for now is the fact that the column separator 
is `';'` and not `','` as it should be in a `.csv` file.


In [None]:
import requests
import os

# The relative path of the folder containing your data
path_data = '../data'
filename = 'gro.csv.gz'
filepath = os.path.join(path_data, filename)

if os.path.exists(filepath):
    print(f'The file {filename} already exists in folder {path_data}/.')
else:
    url = 'https://stephane-v-boucheron.fr/data/gro.csv.gz'
    r = requests.get(url)
    with open(filepath, 'wb') as f:
        f.write(r.content)
    print(f'Downloaded file {filename} in folder {path_data}/.')

In [None]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps

from pyspark.sql import SparkSession
from pyspark import SparkConf

In [None]:
spark = SparkSession.builder.getOrCreate()

spark

In [None]:
sdf = ps.read_csv(filepath, sep=';')


In [None]:
sdf.columns

In [None]:
sdf.info()

In [None]:
sdf.describe()

In [None]:
sdf.head(5)

The last columns look empty. Moreover   they do not show up in the data description (the metadata).

In [None]:
sdf["BirthDate"].head(5)

In [None]:
type(sdf.loc[0, 'BirthDate'])

This means that dates are imported as a strings...

In [None]:
sdf['Prod_Sub_Category'].head()

In [None]:
type(sdf.loc[0, 'Prod_Sub_Category'])

Categorical variables are imported as a strings as well


In [None]:
sdf['Net_Annual_Income'].head(n=10)

In [None]:
type(sdf.loc[0, 'Net_Annual_Income'])

Net actual income is a string as well ! While it is  a number !!!

::: {.callout-caution}

### Caveat: there are slight differences between Pandas API on Spark and Pandas API.

:::

## Let's assess what we did

It appears that we have to work a little bit more for a correct import of the data.

We face the following problems:

- The last three columns are empty and useless
- Dates are handled as `str` (python's **string** type)
- There are lots of missing values
- Categorial features are handled as `str`
- The `Net_Annual_Income` is imported as a string

Looking at  column names, column descriptions,  and using side information, we can infer the type of features. There are 

- dates features,
- continuous features,
- categorical features, and
- 
some features that could be either treated as categorical or continuous.

- There are lots  of missing values. This needs to be delt with.
- The annual net income is imported as a string, we need to understand why.
- We really need to treat dates as dates and not strings (because we want to compute the age of a client based on its birth year for instance).

Here is a tentative structure of the features

**Continuous features**

- `Years_At_Residence`
- `Net_Annual_Income`
- `Years_At_Business`

**Features to be decided**

- `Number_Of_Dependant`
- `Nb_Of_Products`

**Categorical features**

- `Customer_Type`
- `P_Client`
- `Educational_Level`
- `Marital_Status`
- `Prod_Sub_Category`
- `Source`
- `Type_Of_Residence`
- `Prod_Category`

**Date features**

- `BirthDate`
- `Customer_Open_Date`
- `Prod_Decision_Date`
- `Prod_Closed_Date`

# A closer look at the import problems

Let's find solutions to all these import problems.

## The last three columns are weird and empty 

It seems to come from the fact that the data always ends with several `';'` characters. 
We can remove them simply using the `usecols` option from `read_csv`.

## Dates are actually `str`

We need to specify which columns must be encoded as dates using the `parse_dates` option from `read_csv`. Fortunately enough, `pandas` and `pandas on Spark` are clever enough to interpret the date format.


## There are lots of missing values 

A single column mostly contain missing values.


In [None]:
spam = sdf.isnull().sum()

In [None]:
type(spam)

In [None]:
spam[spam > 100]

The column `Prod_Closed_Date` contains mostly missing values !


In [None]:
sdf[['Prod_Closed_Date']].head(5)

Let's remove the useless columns and check the remaining missing values

Again there are variations. Keyword `inplace` is not legal in Pandas API on Spark

::: {.callout-warning}

We select the columns to be deleted using a logical mask: `spam[spam > 100]`. The result of this expression is a `Pandas on Spark` series. To get the names of the columns to be dropped, we pick the Series index (still  in the Spark world), and collect the result as a `numpy` array (in Pythons world). We get a warning that deserves to be read. We should be able to perform this cleaning operation without any exchange between the cluster and the driver process.      

:::

In [None]:
sdf = sdf.drop(spam[spam > 100].index.to_numpy(), axis="columns")
        

In [None]:
sdf.head()         

Let's display the rows with missing values and let's highlight them


In [None]:
sdf.isnull().head()

In [None]:
# sdf[sdf.isnull().any(axis="columns")].head()  # does not work


In pandas method `any()` can be used columnwise or rowwise, according to the optional argument `axis`. In Pandas on spark, `any()` works columnwise. Fortunately, a special purpose method does the job. 

::: {.callout-note}

### [Supported APIs](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/supported_pandas_api.html)

:::

In [None]:
sdf_without_nulls = sdf.dropna(how='any')

In [None]:
sdf_without_nulls.shape, sdf.shape

::: {.callout-note}

We need to turn around this impediment. A possible workaround starts from observing that `sdf.isnull()` is a `pandas on spark` dataframe. It can be transformed  into a Spark dataframe using `to_spark()`, and the result is a RDD of `Row`s. Class `Row` has few methods, but `count(value)` fits well our goal. It is enough to count `True` in each row.  

:::


 

In [None]:
egg = (
    sdf.isnull()
    .to_spark(index_col='idx')
    .rdd
)

In [None]:
spam = (
    egg
      .map(lambda x : (x[0], x))
      .mapValues(lambda x : x.count(True))
      .filter(lambda x : x[1]>0)
      .map(lambda x : x[0])
      .collect()
)

In [None]:
spam

In [None]:
sdf.loc[spam, :]

In [None]:
sdf = sdf_without_nulls

## Categorial features are `str`

We need to say the dtype we want to use for some columns using the `dtype` option of `read_csv`.


In [None]:
type(sdf.loc[0, 'Prod_Sub_Category'])

In [None]:
sdf['Prod_Sub_Category'].unique()

## The annual net income is imported as a string

This problem comes from the fact that the decimal separator is in European notation: it's a `','` and not a `'.'`, so we need to specify it using the `decimal` option to `read_csv`. (Data is French, pardon my French...) 


In [None]:
type(sdf.loc[0, 'Net_Annual_Income'])

In [None]:
sdf['Net_Annual_Income'].head(n=10)

# A correct import of the data

- We build a dict that specifies the dtype to use for each column 
and pass it to `read_csv` using the `dtype` option
- We also specify the `decimal`, `usecols` and `parse_dates` options

**Very pro remark.** Some columns could be imported as `int`. 
However, `pandas` (actually its `numpy`) does not support columns 
with integer dtype and missing values.


In [None]:
#| eval: true
# Does not work completely
gro_dtypes = {
    'Years_At_Residence': np.int64,
    'Net_Annual_Income' : np.float64,
    'Years_At_Business': np.float64,
    'Number_Of_Dependant': np.float64,
    'Nb_Of_Products': np.int64,
    'Customer_Type': 'category',
    'P_Client': 'category',
    'Educational_Level': 'category',
    'Marital_Status': 'category',
    'Prod_Sub_Category': 'category',
    'Source': 'category',
    'Type_Of_Residence': 'category',
    'Prod_Category': 'category',
}

In [None]:
ssdf = ps.read_csv(
    filepath,  # Filename   
    sep=';',  # Column separator
    decimal=',',      # Decimal separator
    usecols=range(19), # Range of the columns to keep (remove the last three ones)
    dtype=gro_dtypes
)

In [None]:
ssdf.dtypes

::: {.callout-note}
We have not used the optional `parse_dates` argument
:::

In [None]:
col_dates = [c for c in ssdf.columns if (c.endswith('Date'))]

In [None]:
ssdf.loc[:, col_dates].head()

In [None]:
for c in col_dates:
    ssdf[c] = ps.to_datetime(ssdf[c], format='%d/%m/%Y')

In [None]:
ssdf.dtypes

In [None]:
ssdf.to_spark().explain()

In [None]:
ssdf = ssdf.to_spark().localCheckpoint()

In [None]:
ssdf.explain()

In [None]:
ssdf = ssdf.pandas_api()

In [None]:
ssdf.loc[:, col_dates].head()

In [None]:
ssdf['Prod_Sub_Category'].head()

In [None]:
truc = ssdf['Prod_Sub_Category'].head()

In [None]:
ssdf.loc[0, 'BirthDate']

Let's remove `Prod_Closed_Date` (mostly contains missing values)


In [None]:
prod_closed_date = ssdf.pop('Prod_Closed_Date')
ssdf.shape

In [None]:
# Now we save the cleaned dataset into a CSV file
ssdf.to_csv(os.path.join(path_data, "gro_spark_cleaned.csv"))

In [None]:
!pwd
!ls -l ../data/gro*csv

## Comment on file formats

You can use other methods starting with `.to_XX` to save in another format.
Here are some main examples

- OK to use `csv` for "small" datasets (several MB)
- Use `pickle` for more compressed and faster format (limited to 4GB). It's the standard binary serialization format of `Python`
- `feather` is another fast and lightweight file format for storing data frames. A very popular exchange format. 
- `parquet` is a format for big distributed data (works nicely with `Spark`)

among several others...


In [None]:
ssdf.to_parquet(os.path.join(path_data, "gro_spark_cleaned." + "parquet"))

In [None]:
ssdf.index

And you can read again using the corresponding `read_XX` function


In [None]:
ssdf = ps.read_parquet(os.path.join(path_data, "gro_spark_cleaned." + "parquet"))
ssdf.head()

In [None]:
!ls -l ../data/gro_spark*

## The net income columns is very weird


In [None]:
income = ssdf['Net_Annual_Income']
income.describe()

In [None]:
(income <= 100).sum(), (income > 100).sum()

Most values are smaller than 100, while some are much much larger...


In [None]:
(
    ssdf["Net_Annual_Income"]
        .plot
        .hist(bins=40, 
            hitsnorm='density', 
            log_x=True
        )
)

This is annoying, we don't really see much...


In [None]:
(ssdf['Net_Annual_Income'] == 36.0).sum()

In [None]:
spam = ssdf['Net_Annual_Income'].astype("category")

In [None]:
type(spam)

In [None]:
income_counts = (
    ps.DataFrame({
        "income_category": ssdf['Net_Annual_Income'].astype("category").to_numpy(),
        "income": ssdf['Net_Annual_Income'].to_numpy()
    })
    .groupby("income_category")
    .count()
    .reset_index()
    .rename(columns={"income": "#customers"})
    .sort_values(by="#customers", ascending=False)
)
    

::: {.callout-caution}
### This is not fluent spark!
:::

In [None]:
spam = income_counts["#customers"].cumsum() / income_counts["#customers"].sum()

In [None]:
spam = spam.reindex(income_counts.index)

In [None]:
ps.set_option('compute.ops_on_diff_frames', True)

In [None]:
income_counts["%cummulative clients"] = spam

In [None]:
income_counts.iloc[:20].style.bar(subset=["%cummulative clients"], vmin=0, vmax=1)

- We have some overrepresented values (many possible explanations for this)
- To clean the data, we can, for instance, keep only the revenues between [10, 200], or leave it as such


In [None]:
#| eval: false
ssdf = ssdf[(ssdf['Net_Annual_Income'] >= 10) & (ssdf['Net_Annual_Income'] <= 200)]

In [None]:
# sns.displot(x='Net_Annual_Income', data=df, bins=15, height=4, aspect=1.5)

# Final preparation of the dataset


In [None]:
# First we make lists of continuous, categorial and date features

cnt_featnames = [
    'Years_At_Residence',
    'Net_Annual_Income',
    'Years_At_Business',
    'Number_Of_Dependant'
]

cat_featnames = [
    'Customer_Type',
    'P_Client',
    'Educational_Level',
    'Marital_Status',
    'Prod_Sub_Category',
    'Source',
    'Type_Of_Residence',
    'Prod_Category',
    'Nb_Of_Products'
]

date_featnames = [
    'BirthDate',
    'Customer_Open_Date',
    'Prod_Decision_Date'
    #'Prod_Closed_Date'
]

## Creation of the features matrix


In [None]:
ssdf[cnt_featnames].head()

In [None]:
bin_features = ps.get_dummies(ssdf[cat_featnames],
                              prefix_sep='#', drop_first=True)

In [None]:
bin_features.head()

In [None]:
cnt_features = ssdf[cnt_featnames]
cnt_features.head()

In [None]:
from pandas import Timestamp

In [None]:
def age(x):
    today = Timestamp.today()
    return (today - x).dt.days

In [None]:
date_features = ssdf[date_featnames].apply(age, axis="index")
date_features.head()

In [None]:
today = Timestamp.today()
today

## Final features matrix


In [None]:
all_features = ps.concat([bin_features, cnt_features, date_features], axis=1)

In [None]:
all_features.columns

In [None]:
all_features.head()

::: {.callout-caution}

We removed lines of data that contained missing values. The index of the dataframe is  not contiguous anymore

:::

In [None]:
all_features.index.max()

This could be a problem for later. So let's reset the index to get a contiguous one


In [None]:
all_features.shape

In [None]:
all_features.reset_index(inplace=True, drop=True)

In [None]:
all_features.head()

In [None]:
all_features.to_spark().explain()

In [None]:
all_features = all_features.to_spark().localCheckpoint()

all_features.explain()

In [None]:
all_features = all_features.pandas_api()

## Let's save the data using parquet


In [None]:
#| eval: false
import pickle as pkl

X = all_features
y = ssdf['Y']

# Let's put eveything in a dictionary
df_pkl = {}
# The features and the labels
df_pkl['features'] = X
df_pkl['labels'] = y
# And also the list of columns we built above
df_pkl['cnt_featnames'] = cnt_featnames
df_pkl['cat_featnames'] = cat_featnames
df_pkl['date_featnames'] = date_featnames

In [None]:
# with open(os.path.join(path_data, "gro_training.pkl"), 'wb') as f:
#    pkl.dump(df_pkl, f)

In [None]:
all_features.to_parquet(os.path.join(path_data, "gro_features_training.parquet"))

In [None]:
ls -al gro*

The preprocessed data is saved in a pickle file called `gro_training.pkdfl`.



[Databricks blog about Koalas, SPIP, Zen](https://www.databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html)

> pandas users will be able scale their workloads with one simple line change in the upcoming Spark 3.2 release:

```{.python}
<s>from pandas import read_csv</s>
from pyspark.pandas import read_csv
pdf = read_csv("data.csv")
```

# Pandas on Spark dataframes versus Spark SQL DAtaframes

[Official comparison](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html)

> PySpark users can access the full PySpark APIs by calling DataFrame.to_spark(). pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable

Pandas on Spark dataframes do not have a `rdd` attribute. We can obtain the number of partitions by first calling `to_spark()`

In [None]:
sdf.to_spark().rdd.getNumPartitions()

- Does this require copying or moving information?
- What is the type of the result ?

In [None]:
type(sdf.to_spark())

In [None]:
sdf.to_spark().show(5)

In [None]:
sdf.to_spark().pandas_api().head()

::: {.callout-caution}

### Mind the index 

Pandas on Spark dataframes do have an index (a Row index) just as Pandas dataframes. By converting using `.to_spark()`, you may loose the index. See [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html)  


:::

Inspect execution plans using `....spark.explain()`

In [None]:
sdf.spark.explain()

Use `checkpoint()` and/or `local_checkpoint()`

In [None]:
sdf = sdf.spark.local_checkpoint()

In [None]:
sdf.spark.explain()

In [None]:
sdf.sort_values('BirthDate').spark.explain()

::: {.callout-tip}
### Use distributed or distributed-sequence default index
:::

> One common issue that pandas-on-Spark users face is the slow performance due to the default index. Pandas API on Spark attaches a default index when the index is unknown, for example, Spark DataFrame is directly converted to pandas-on-Spark DataFrame.

> Note that sequence requires the computation on a single partition which is discouraged. If you plan to handle large data in production, make it `distributed` by configuring the default index to `distributed` or `distributed-sequence`.

In [None]:
ps.get_option('compute.default_index_type')

Do we want the index to increase monotonically one by one? If not, use `distributed`. 

::: {.callout-warning}
[Use pandas api on spark directly whenever possible](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/best_practices.html#use-pandas-api-on-spark-directly-whenever-possible)
:::

Pandas-on-Spark datasets are meant to live across multiple machines, and they are computed in a distributed manner. It is difficult to be *locally iterable* and it is very likely users collect the entire data into the client side without knowing it. 

Therefore, it is best to stick to using pandas-on-Spark APIs. 

## Multitable operations