Ad Code

Data Science labs blog

Top Pyspark Features to become ETL Master

Top Pyspark Features to become ETL Master

Become master to use Pyspark for ETL, the one of the most used Python tools for Big Data Analysis

Image for post

As we know, the volume of data is increase by hour, really really big data. The amount of the newly created data in 2020 was predicted to grow 44X to reach 35 zettabytes (35 trillion gigabytes). And the most of theme, they are raw data. So the problem of science is how to research, investigate to find the mining of big data (Data Mining). Fortunately, our excellent developers was built many tools, frameworks for processing , ETL (Extract, Transform, Loading) big data and one of theme is Spark.

So What is Spark and Why?

Spark is a general-purpose distributed data processing engine that is suitable for use in a wide range of circumstances. On top of the Spark core data processing engine, there are libraries for SQL, machine learning, graph computation, and stream processing, which can be used together in an application. Programming languages supported by Spark include: Java, Python, Scala, and R. Application developers and data scientists incorporate Spark into their applications to rapidly query, analyze, and transform data at scale. Tasks most frequently associated with Spark include ETL and SQL batch jobs across large data sets, processing of streaming data from sensors, IoT, or financial systems, and machine learning tasks.
You can read more about Spark in here. Some detail information in Spark as: MapReduce, How to Spark run in Clusters, Spark Streaming, how it run Pipeline ……
In this fundamental topic, I only focus on Spark SQL for data processing, how to write the and use Python for example (Pyspark).
The dataset will be use for practical is Movie dataset from Megogo Challenge in Kaggle. You can view and download it in here.
NoteWe assume you have knowledge about data frame (Maybe a little with Pandas.) If not, you can read my blog in here about Pandas first. If you have knowledge about Pandas, I think you can learn Pysark for ETL easier and faster.

Installing

You can following in this document from Spark to install. (It is not complex to install theme, so I really believe you can do it by yourself :D ).

Generic Load/Save Functions

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

Image for post

You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., org.apache.spark.sql.parquet), but for built-in sources you can also use their short names (jsonparquetjdbcorclibsvmcsvtext). DataFrames loaded from any data source type can be converted into other types using this syntax.

Image for post
Image for post

More, we have many kind of load data with Spark. Not only local, we can load data from AWS S3, GCS, Redshift, Athena, SFTP, Google BigQuery …. with one magic line. (spark.read.load) 💁, so amazing….

Actually, we can have load, extract and read data from one query. But normally, I don’t recommend you use that because it not easy to trace back and validate data. Example:

Image for post

Oki lah, Now we will read our movie dataset to go to detail.

Image for post

It seem ok, Now we will go to main of this fundamental topic.

TOP 100 Pyspark Features

pyspark.sql.DataFrame

  1. show(n) : Show and return top n rows of this data frame.
Image for post

2. collect() : Returns all the records as a list of Row.

Image for post

3. columns (property) : Returns all column names as a list.

Image for post

4. count() : Returns the number of rows in this DataFrame.

Image for post

5. describe() : Computes basic statistics for numeric and string columns.
This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Actually this is not good example because in this data, we don’t have columns is number, but I think you can easy understand that.

Image for post

6. Distinct() : Returns a new DataFrame containing the distinct rows in this DataFrame.
You can see the example to recognize the number of records reduced because distinct method was drop some duplicate row in original data frame.

Image for post

7. drop(*cols) : Returns a new DataFrame that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Example, you can see 3 columns in [‘Director’, ‘Genre’, ‘Cast’] was deleted.

Image for post

8. dropDuplicates() : Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

Image for post

9. dropna(how=’any’, thresh=None, subset=None) : Returns a new DataFrame omitting rows with null values.

Image for post

10. explain() : Prints the (logical and physical) plans to the console for debugging purpose.
Actually, to become expert pyspark ETL, you have to understand this feature clearly. It will be show how your query complex and you can optimize the performance of query base on that. I will have a specific advance topic to discuss about that. In here, I only show you the example:

Image for post

11. fillna(): Replace null values, alias for na.fill()

Image for post

12. filter(condition) : Filters rows using the given condition.

Image for post

13. First() : Returns the first row as a Row.

Image for post

14. foreach(function) : Applies the f function to all Row of this DataFrame.

Image for post

15. groupBy(*cols) : Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

Image for post

16. head(n) : Returns the first n rows.

Image for post

17. join(): Joins with another DataFrame, using the given join expression.
Actually it quite same as SQL Join or Pandas Concat. So I only show you simple example. (But we will talk how to optimize when using join in next topic: advanced pyspark)

Image for post

18. Limit() : Limits the result count to the number specified.

Image for post

19. orderBy(*cols**kwargs): Returns a new DataFrame sorted by the specified column(s).

Image for post

20. printSchema(): Prints out the schema in the tree format.

Image for post

21. repartition() : Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
One of the ways to improve performance of ETL query is partition. If we can calculate the good number partition, your performance will be better. (Talk about advance topic.)

Image for post

22. replace() : Returns a new DataFrame replacing a value with another value.

Image for post

23. rollup(*cols) : Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them.

Image for post

24. select() : Projects a set of expressions and returns a new DataFrame.

Image for post

25. selectExpr() : Projects a set of SQL expressions and returns a new DataFrame.

26. sort(*cols, **kwargs) : Returns a new DataFrame sorted by the specified column(s).

Image for post

27. Summary(*statistics): Computes specified statistics for numeric and string columns. Available statistics are: — count — mean — stddev — min — max — arbitrary approximate percentiles specified as a percentage (eg, 75%)

Image for post

28. toPandas() : Returns the contents of this DataFrame as Pandas pandas.DataFrame. This is only available if Pandas is installed and available.

Image for post

29. union() : Return a new DataFrame containing union of rows in this and another DataFrame.

30. unpersist() : Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. Normally, when you use Pyspark for pipeline, this feature will be useful to clear memory after each part. Remember to use that if you face this problem.

31. withColumn() : Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

Image for post

32. withColumnRenamed() : Returns a new DataFrame by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.

Image for post

pyspark.sql.GroupedData

A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().

33. agg() : Compute aggregates and returns the result as a dataframe.

Image for post

34. apply(udf): Maps each group of the current DataFrame using a pandas udf and returns the result as a DataFrame.The user-defined function should take a pandas.DataFrame and return another pandas.DataFrame. For each group, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

Image for post

35. avg(), count(), max(), mean(), min(), sum()
This is really simple function to apply after groupby. So I only give you an example with count() :

Image for post

36. pivot(): Pivots a column of the current dataframe and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.

Image for post

pyspark.sql.Column

37. alias(): Returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode).

Image for post

38. asc() : Returns a sort expression based on ascending order of the column.

Image for post

39. between() : A boolean expression that is evaluated to true if the value of this expression is between the given columns.

Image for post

40. cast(dataType): Convert the column into type dataType.

Image for post

41. contain() : Contains the other element. Returns a boolean Column based on a string match.

Image for post

42. desc() : Returns a sort expression based on the descending order of the column.

Image for post

43. endswith(): String ends with. Returns a boolean Column based on a string match.

Image for post

44. isNotNull() : True if the current expression is NOT null.

Image for post

45. isNull() : True if the current expression is null. (Equal with ~isNotNull())

Image for post

46. isin(*) : A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.

Image for post

47. like() : SQL like expression. Returns a boolean Column based on a SQL LIKE match.

Image for post

48. otherwise(): Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.

Image for post

49. over() : Define a windowing column.

Image for post

50. startswith() : String starts with. Returns a boolean Column based on a string match. It look like endswith() so I will don’t give you example.
You can practice and try some query like me in here.

I think it is enough for you to cover basic ETL with Pyspark. Most of theme, you will use a lot. I hope this fundamental topic will useful for you. (Clap this blog if you want to do that =))) ).

See you in advanced topic with ETL Pyspark.

Content from : https://medium.com/@minhtc.uet/top-100-pyspark-features-master-etl-with-pyspark-1-fundamental-310c72938d14

Reactions

Post a Comment

0 Comments