Overview of Ambrosia Splitter class Spark data support¶
This example shows the functionality of the Splitter class on Spark DataFrames. Synthetic data on MTS KION users metrics is used.
The functionality of the
Designer class on Spark data currently is limited compared to the pandas format.See the main
Splitter tutorial on pandas data to learn the full functionality and details of splitting experimental objects into groups.Note: Ambrosia now supports only batch spliiting. Real-time splitting tools are under development.
[2]:
import os
import pandas as pd
import pyspark
from ambrosia.splitter import Splitter
Your CPU supports instructions that this binary was not compiled to use: AVX2
For maximum performance, you can install NMSLIB from sources
pip install --no-binary :all: nmslib
Build local spark session
[3]:
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'
spark = pyspark.sql.SparkSession.builder.master("local[1]").getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/20 17:38:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/20 17:38:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Create Spark DataFrame
[4]:
kion_dataset = pd.read_csv("./../tests/test_data/kion_data.csv", sep=';')
sdf = spark.createDataFrame(kion_dataset)
[5]:
kion_dataset.shape
[5]:
(300000, 5)
[6]:
sdf.printSchema()
root
|-- profile_id: long (nullable = true)
|-- sum_dur: long (nullable = true)
|-- vod_cnt: long (nullable = true)
|-- ln_vod_cnt: double (nullable = true)
|-- bin_col: long (nullable = true)
Spark hash group split¶
Unlike pandas data, only the
"hash' method is implemented for spark.This method allows to deterministically create groups using the
salt parameter.Set data and name of column with unique object ids
[7]:
splitter = Splitter(dataframe=sdf, id_column='profile_id')
Make hash split on 2 groups with specified salt value
[8]:
sdf_hash_split = splitter.run(groups_size=1000, method='hash', salt='spark322')
23/04/20 17:38:34 WARN TaskSetManager: Stage 0 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:35 WARN TaskSetManager: Stage 3 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 9 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
23/04/20 17:38:36 WARN TaskSetManager: Stage 12 contains a task of very large size (8236 KiB). The maximum recommended task size is 1000 KiB.
[10]:
sdf_hash_split.toPandas()
23/04/20 17:38:47 WARN TaskSetManager: Stage 16 contains a task of very large size (8237 KiB). The maximum recommended task size is 1000 KiB.
[10]:
| profile_id | sum_dur | vod_cnt | ln_vod_cnt | bin_col | group | |
|---|---|---|---|---|---|---|
| 0 | 559783878399 | 16243096 | 26 | 3.451662 | 1 | A |
| 1 | 807427182946 | 55078 | 3 | 0.909034 | 0 | A |
| 2 | 845784297949 | 31545 | 1 | 0.000000 | 0 | A |
| 3 | 41350284663 | 1878050 | 10 | 2.894374 | 0 | A |
| 4 | 5082903657 | 584191 | 1 | 0.475820 | 0 | A |
| ... | ... | ... | ... | ... | ... | ... |
| 1995 | 449871171656 | 5890763 | 29 | 3.699892 | 1 | B |
| 1996 | 25374705733 | 3964937 | 51 | 4.053246 | 1 | B |
| 1997 | 368955636652 | 27693 | 1 | 0.000000 | 0 | B |
| 1998 | 674408525538 | 7284 | 1 | 0.000000 | 0 | B |
| 1999 | 942809058983 | 8173296 | 43 | 4.002533 | 1 | B |
2000 rows × 6 columns
Now make 5 different groups each of 1000 objects
[14]:
sdf_hash_split_multi = splitter.run(groups_size=1000,
groups_number=5,
method='hash',
salt='spark322')
[15]:
hash_split_multi = sdf_hash_split_multi.toPandas()
Five unique groups each of 1000 objects are created
[20]:
hash_split_multi.group.value_counts()
[20]:
A 1000
B 1000
C 1000
D 1000
E 1000
Name: group, dtype: int64
Check the distribution of a binary variable
[16]:
hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000
[16]:
| bin_col | ||
|---|---|---|
| group | bin_col | |
| A | 0 | 0.615 |
| 1 | 0.385 | |
| B | 0 | 0.593 |
| 1 | 0.407 | |
| C | 0 | 0.598 |
| 1 | 0.402 | |
| D | 0 | 0.611 |
| 1 | 0.389 | |
| E | 0 | 0.611 |
| 1 | 0.389 |
And finally, make the split with the same parameters, but with stratification
[19]:
sdf_strat_hash_split_multi = splitter.run(groups_size=1000,
strat_columns=['bin_col'],
groups_number=5,
method='hash',
salt='spark322')
[21]:
strat_hash_split_multi = sdf_strat_hash_split_multi.toPandas()
Due to the stratification, the binary value in groups will be distributed as in the source table
[22]:
strat_hash_split_multi.groupby('group').agg({"bin_col": "value_counts"}) / 1000
[22]:
| bin_col | ||
|---|---|---|
| group | bin_col | |
| A | 0 | 0.609 |
| 1 | 0.391 | |
| B | 0 | 0.609 |
| 1 | 0.391 | |
| C | 0 | 0.609 |
| 1 | 0.391 | |
| D | 0 | 0.609 |
| 1 | 0.391 | |
| E | 0 | 0.609 |
| 1 | 0.391 |
[23]:
spark.stop()