Ad Code

Data Science labs blog

Topic modelling with LDA-(Latent Dirichlet Allocation) in Pyspark

Topic modelling with LDA-(Latent Dirichlet Allocation) in Pyspark

Image for post
# check if spark context is defined
print(sc.version)
# importing some librariesimport pandas as pd
import pyspark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# stuff we'll need for text processingfrom nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
# stuff we'll need for building the model

from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
# reading the data
data = sqlContext.read.format("csv") \
.options(header='true', inferschema='true') \
.load(os.path.realpath("Womens Clothing E-Commerce Reviews.csv"))
reviews = data.map(lambda x : x['Review Text']).filter(lambda x: x is not None)StopWords = stopwords.words("english")tokens = reviews                                                   \
.map( lambda document: document.strip().lower()) \
.map( lambda document: re.split(" ", document)) \
.map( lambda word: [x for x in word if x.isalpha()]) \
.map( lambda word: [x for x in word if len(x) > 3] ) \
.map( lambda word: [x for x in word if x not in StopWords]) \
.zipWithIndex()
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])# TF
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)# IDFidf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)
num_topics = 10
max_iterations = 100
lda_model = LDA.train(result_tfidf[['index','features']].map(list), k=num_topics, maxIterations=max_iterations)
wordNumbers = 5  topicIndices = sc.parallelize(lda_model.describeTopics\(maxTermsPerTopic = wordNumbers))def topic_render(topic):
terms = topic[0]
result = []
for i in range(wordNumbers):
term = vocabArray[terms[i]]
result.append(term)
return result
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()for topic in range(len(topics_final)):
print ("Topic" + str(topic) + ":")
for term in topics_final[topic]:
print (term)
print ('\n')
Reactions

Post a Comment

0 Comments