Ad Code

Data Science labs blog

Feature Engineering using pyspark

 Feature Engineering using pyspark

Image for post
# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option","some-value").getOrCreate()
sales = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"data/retail-data/by-day/*.csv").coalesce(5).where("Description IS NOT NULL")
fakeIntDF=spark.read.parquet("/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-integers")
simpleDF=spark.read.json(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml")
scaleDF=spark.read.parquet(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-scaling")

Vector assembler

fakeIntDF.cache()
fakeIntDF.show()
+----+----+----+
|int1|int2|int3|
+----+----+----+
| 7| 8| 9|
| 1| 2| 3|
| 4| 5| 6|
+----+----+----+
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["int1", "int2", "int3"],outputCol="features")
assembler.transform(fakeIntDF).show()
+----+----+----+-------------+
|int1|int2|int3| features|
+----+----+----+-------------+
| 7| 8| 9|[7.0,8.0,9.0]|
| 1| 2| 3|[1.0,2.0,3.0]|
| 4| 5| 6|[4.0,5.0,6.0]|
+----+----+----+-------------+

Bucketing

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
from pyspark.ml.feature import Bucketizer
bucketBorders=[-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketer=Bucketizer().setSplits(bucketBorders).setInputCol("features").setOutputCol("Buckets")
bucketer.transform(dataFrame).show()
+--------+-------+
|features|Buckets|
+--------+-------+
| -999.9| 0.0|
| -0.5| 1.0|
| -0.3| 1.0|
| 0.0| 2.0|
| 0.2| 2.0|
| 999.9| 3.0|
+--------+-------+

Scaling and normalization

scaleDF.show()+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 1|[3.0,10.1,3.0]|
+---+--------------+
from pyspark.ml.feature import StandardScaler
Scalerizer=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
Scalerizer.fit(scaleDF).transform(scaleDF).show()
+---+--------------+------------------------------------------------
|id |features |Scaled_features|
+---+--------------+------------------------------------------------
|0 |[1.0,0.1,-1.0]|[1.195228,0.02337,-0.597614]|
|1 |[2.0,1.1,1.0] |[2.390457,0.25713,0.597614] |
|0 |[1.0,0.1,-1.0]|[1.19522,0.023376,-0.5976143046671968]|
|1 |[2.0,1.1,1.0] |[2.390457,0.257138,0.597614] |
|1 |[3.0,10.1,3.0]|[3.585682.3609991,1.792842] |
+---+--------------+------------------------------------------------

MinMaxScaler

from pyspark.ml.feature import MinMaxScaler
# Let us create an object of MinMaxScaler class
MinMaxScalerizer=MinMaxScaler().setMin(5).setMax(10).setInputCol("features").setOutputCol("MinMax_Scaled_features")
MinMaxScalerizer.fit(scaleDF).transform(scaleDF).show()
+---+--------------+----------------------+
| id| features|MinMax_Scaled_features|
+---+--------------+----------------------+
| 0|[1.0,0.1,-1.0]| [5.0,5.0,5.0]|
| 1| [2.0,1.1,1.0]| [7.5,5.5,7.5]|
| 0|[1.0,0.1,-1.0]| [5.0,5.0,5.0]|
| 1| [2.0,1.1,1.0]| [7.5,5.5,7.5]|
| 1|[3.0,10.1,3.0]| [10.0,10.0,10.0]|
+---+--------------+----------------------+

MinAbsScaler

from pyspark.ml.feature import MaxAbsScaler
# Let us create an object of MinAbsScaler class
MinAbsScalerizer=MaxAbsScaler().setInputCol(“features”).setOutputCol(“MinAbs_Scaled_features”)
MinAbsScalerizer.fit(scaleDF).transform(scaleDF).show(truncate =False)
+---+--------------+------------------------------------------------
|id |features |MinAbs_Scaled_features|
+---+--------------+------------------------------------------------
|0 |[1.0,0.1,-1.0]|[0.33333,0.009900,-0.33333]|
|1 |[2.0,1.1,1.0] |[0.66666,0.108910,0.333333]|
|0 |[1.0,0.1,-1.0]|[0.333333,0.00990,-0.33333]|
|1 |[2.0,1.1,1.0] |[0.666666,0.108910,0.33333]|
|1 |[3.0,10.1,3.0]|[1.0,1.0,1.0] |
+---+--------------+------------------------------------------------

ElementwiseProduct

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Let us define a scaling vectorScalebyVector=Vectors.dense([10,0.1,-1])# Let us create an object of the class Elementwise product
ScalingUp=ElementwiseProduct().setScalingVec(ScalebyVector).setInputCol("features").setOutputCol("ElementWiseProduct")
# Let us transform
ScalingUp.transform(scaleDF).show()
+---+--------------+-------------------------------+
|id |features |ElementWiseProduct |
+---+--------------+-------------------------------+
|0 |[1.0,0.1,-1.0]|[10.0,0.010,1.0] |
|1 |[2.0,1.1,1.0] |[20.0,0.110,-1.0] |
|0 |[1.0,0.1,-1.0]|[10.0,0.010,1.0] |
|1 |[2.0,1.1,1.0] |[20.0,0.110,-1.0] |
|1 |[3.0,10.1,3.0]|[30.0,1.01,-3.0] |
+---+--------------+-------------------------------+

Normalizer

Image for post
from pyspark.ml.feature import Normalizer
l1_norm=Normalizer().setP(1).setInputCol("features").setOutputCol("l1_norm")
l2_norm=Normalizer().setP(2).setInputCol("features").setOutputCol("l2_norm")
linf_norm=Normalizer().setP(float("inf")).setInputCol("features").setOutputCol("linf_norm")
# Let us transform
l1_norm.transform(scaleDF).show(truncate=False)
+---+--------------+-----------------------------------------------+
|id |features |l1_norm |
+---+--------------+-----------------------------------------------+
|0 |[1.0,0.1,-1.0]|[0.47619,0.047619,-0.47619]|
|1 |[2.0,1.1,1.0] |[0.48780,0.26829,0.24390] |
|0 |[1.0,0.1,-1.0]|[0.47619,0.047619,-0.47619]|
|1 |[2.0,1.1,1.0] |[0.48780,0.26829,0.24390] |
|1 |[3.0,10.1,3.0]|[0.18633,0.62732,0.18633] |
+---+--------------+-----------------------------------------------+

StringIndexer (Converting strings to numerical values)

simpleDF.show(5)+-----+----+------+------------------+
|color| lab|value1| value2|
+-----+----+------+------------------+
|green|good| 1|14.386294994851129|
| blue| bad| 8|14.386294994851129|
| blue| bad| 12|14.386294994851129|
|green|good| 15| 38.97187133755819|
|green|good| 12|14.386294994851129|
+-----+----+------+------------------+
only showing top 5 rows
from pyspark.ml.feature import StringIndexer
# Let us create an object of the class StringIndexer
lblindexer=StringIndexer().setInputCol("lab").setOutputCol("LabelIndexed")
# Let us transform
idxRes=lblindexer.fit(simpleDF).transform(simpleDF)
idxRes=idxRes.drop("value1","value2")
idxRes.show(5)
+-----+----+------------+
|color| lab|LabelIndexed|
+-----+----+------------+
|green|good| 1.0|
| blue| bad| 0.0|
| blue| bad| 0.0|
|green|good| 1.0|
|green|good| 1.0|
+-----+----+------------+
only showing top 5 rows

IndexToString

from pyspark.ml.feature import IndexToStringLabelReverse=IndexToString().setInputCol("LabelIndexed")\
.setOutputCol("ReverseIndex")
LabelReverse.transform(idxRes).show(5)
+-----+----+------------+------------+
|color| lab|LabelIndexed|ReverseIndex|
+-----+----+------------+------------+
|green|good| 1.0| good|
| blue| bad| 0.0| bad|
| blue| bad| 0.0| bad|
|green|good| 1.0| good|
|green|good| 1.0| good|
+-----+----+------------+------------+
only showing top 5 rows

Indexing within Vectors

from pyspark.ml.linalg import Vectors
dataln=spark.createDataFrame([(Vectors.dense(1,2,3),1),(Vectors.dense(2,5,6),2),(Vectors.dense(1,8,9),3)]).toDF("features","labels")
dataln.show()
+-------------+------+
| features|labels|
+-------------+------+
|[1.0,2.0,3.0]| 1|
|[2.0,5.0,6.0]| 2|
|[1.0,8.0,9.0]| 3|
+-------------+------+

Conclusion




Reactions

Post a Comment

0 Comments