Overview of Ambrosia Splitter class Spark data support

This example shows the functionality of the Splitter class on Spark DataFrames. Synthetic data on MTS KION users metrics is used.

The functionality of the Designer class on Spark data currently is limited compared to the pandas format.
See the main Splitter tutorial on pandas data to learn the full functionality and details of splitting experimental objects into groups.

Note: Ambrosia now supports only batch spliiting. Real-time splitting tools are under development.

[2]:
import os

import pandas as pd
import pyspark

from ambrosia.splitter import Splitter
Your CPU supports instructions that this binary was not compiled to use: AVX2
For maximum performance, you can install NMSLIB from sources
pip install --no-binary :all: nmslib

Build local spark session

[3]:
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
spark = pyspark.sql.SparkSession.builder.master("local[1]").getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/20 17:38:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/20 17:38:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Create Spark DataFrame

[4]:
kion_dataset = pd.read_csv("./../tests/test_data/kion_data.csv", sep=';')
sdf = spark.createDataFrame(kion_dataset)
[5]:
kion_dataset.shape
[5]:
(300000, 5)
[6]:
sdf.printSchema()
root
 |-- profile_id: long (nullable = true)
 |-- sum_dur: long (nullable = true)
 |-- vod_cnt: long (nullable = true)
 |-- ln_vod_cnt: double (nullable = true)
 |-- bin_col: long (nullable = true)

Spark hash group split

Unlike pandas data, only the "hash' method is implemented for spark.
This method allows to deterministically create groups using the salt parameter.

Set data and name of column with unique object ids

[7]:
splitter = Splitter(dataframe=sdf, id_column='profile_id')

Make hash split on 2 groups with specified salt value

[8]:
sdf_hash_split = splitter.run(groups_size=1000, method='hash', salt='spark322')
23/04/20 17:38:34 WARN TaskSetManager: Stage 0 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.

23/04/20 17:38:35 WARN TaskSetManager: Stage 3 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 9 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 12 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
[10]:
sdf_hash_split.toPandas()
23/04/20 17:38:47 WARN TaskSetManager: Stage 16 contains a task of very large size (8237 KiB). The maximum recommended task size is 1000 KiB.

[10]:
profile_id sum_dur vod_cnt ln_vod_cnt bin_col group
0 559783878399 16243096 26 3.451662 1 A
1 807427182946 55078 3 0.909034 0 A
2 845784297949 31545 1 0.000000 0 A
3 41350284663 1878050 10 2.894374 0 A
4 5082903657 584191 1 0.475820 0 A
... ... ... ... ... ... ...
1995 449871171656 5890763 29 3.699892 1 B
1996 25374705733 3964937 51 4.053246 1 B
1997 368955636652 27693 1 0.000000 0 B
1998 674408525538 7284 1 0.000000 0 B
1999 942809058983 8173296 43 4.002533 1 B

2000 rows × 6 columns

Now make 5 different groups each of 1000 objects

[14]:
sdf_hash_split_multi = splitter.run(groups_size=1000,
                                    groups_number=5,
                                    method='hash',
                                    salt='spark322')
[15]:
hash_split_multi = sdf_hash_split_multi.toPandas()

Five unique groups each of 1000 objects are created

[20]:
hash_split_multi.group.value_counts()
[20]:
A    1000
B    1000
C    1000
D    1000
E    1000
Name: group, dtype: int64

Check the distribution of a binary variable

[16]:
hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000
[16]:
bin_col
group bin_col
A 0 0.615
1 0.385
B 0 0.593
1 0.407
C 0 0.598
1 0.402
D 0 0.611
1 0.389
E 0 0.611
1 0.389

And finally, make the split with the same parameters, but with stratification

[19]:
sdf_strat_hash_split_multi = splitter.run(groups_size=1000,
                                          strat_columns=['bin_col'],
                                          groups_number=5,
                                          method='hash',
                                          salt='spark322')
[21]:
strat_hash_split_multi = sdf_strat_hash_split_multi.toPandas()

Due to the stratification, the binary value in groups will be distributed as in the source table

[22]:
strat_hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000
[22]:
bin_col
group bin_col
A 0 0.609
1 0.391
B 0 0.609
1 0.391
C 0 0.609
1 0.391
D 0 0.609
1 0.391
E 0 0.609
1 0.391
[23]:
spark.stop()