I'm using spark 2.4.5 and I need to calculate the sentiment score from the list of tokens (MeaningfulWords column) from df1 , based on the words and scores from df2 (sentiment dictionary). In Df1 I must create a new column with the list of token scores and another column with the average sentiment (sum of scores/total words) of each record.
The dataframes look like this:
df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
| ID| MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003| [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+
df2.show(5)
+-----+----------+
|score| word|
+-----+----------+
| 1.68|abandonado|
| 3.18| abejas|
| 2.8| aborto|
| 2.46| abrasador|
| 8.13| abrazo|
+-----+----------+
The result of the new columns should be something like this:
+------------------+---------------------+
| MeanScore| ScoreList|
+------------------+---------------------+
| 2.95|[3.10, 2.50, 1.28,...|
| 2.15|[1.15, 3.50, 2.75,...|
| 2.75|[4.20, 1.00, 1.75,...|
| 3.25|[3.25, 2.50, 3.20,...|
| 3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+
I have reviewed several options using .join, but when dealing with different data types between the columns, it gives an error.
I've checked options like https://stackoverflow.com/questions/36576196/joining-pyspark-dataframes-on-nested-field , but I can't do a direct join between the two columns because they have different data types.
I've also tried converting the Df's to RDD's and using a function, like so:
def map_words_to_values(review_words, afinn_dict):
return [afinn_dict[word] for word in review_words if word in afinn_dict]
RDD1=swRemoved.rdd.map(list)
RDD2=Dict_df.rdd.map(list)
reviewsRDD_afinn_values = RDD1.map(lambda tupple: (tupple[0], map_words_to_values(tupple[1], RDD2)))
reviewsRDD_afinn_values.take(3)
But with this last option I get the following error:
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
I know how to solve it with pandas, but I would like to find the correct way to solve it with spark without punishing performance.
They have solved my problem at https://stackoverflow.com/questions/61687997/calculate-new-column-in-spark-dataframe-crossing-a-tokens-list-column-in-df1-wi :
You can do this first with a
join
usingarray_contains(MeaningfulWords,word)
, thengroupBy
andcollect_list
from all the words they didjoin
, then using the higher order functionstransform
andaggregate
to get the mean score (valid in spark2.4+).The higher order function
aggregate
only accepts integer values, so it was necessarytransform
to use convert, and at the end divide by 100 (Assuming a maximum of 2 decimal places, eg 2.81).