我正在使用 spark 2.4.5,我需要根据df2(情感词典)中的单词和分数从df1的标记列表(MeaningfulWords 列)中计算情感分数。在 Df1 中,我必须创建一个包含标记分数列表的新列,以及另一个包含每条记录的平均情绪(分数总和/总单词)的列。
数据框如下所示:
df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
| ID| MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003| [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+
df2.show(5)
+-----+----------+
|score| word|
+-----+----------+
| 1.68|abandonado|
| 3.18| abejas|
| 2.8| aborto|
| 2.46| abrasador|
| 8.13| abrazo|
+-----+----------+
新列的结果应该是这样的:
+------------------+---------------------+
| MeanScore| ScoreList|
+------------------+---------------------+
| 2.95|[3.10, 2.50, 1.28,...|
| 2.15|[1.15, 3.50, 2.75,...|
| 2.75|[4.20, 1.00, 1.75,...|
| 3.25|[3.25, 2.50, 3.20,...|
| 3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+
我已经使用 .join 查看了几个选项,但是在处理列之间的不同数据类型时,它会出错。
我已经检查了诸如https://stackoverflow.com/questions/36576196/joining-pyspark-dataframes-on-nested-field之类的选项,但是我无法在两列之间进行直接连接,因为它们具有不同的数据类型。
我还尝试将 Df 转换为 RDD 并使用函数,如下所示:
def map_words_to_values(review_words, afinn_dict):
return [afinn_dict[word] for word in review_words if word in afinn_dict]
RDD1=swRemoved.rdd.map(list)
RDD2=Dict_df.rdd.map(list)
reviewsRDD_afinn_values = RDD1.map(lambda tupple: (tupple[0], map_words_to_values(tupple[1], RDD2)))
reviewsRDD_afinn_values.take(3)
但是使用最后一个选项,我收到以下错误:
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
我知道如何用 pandas 解决它,但我想找到正确的方法来用 spark 解决它而不影响性能。
他们在https://stackoverflow.com/questions/61687997/calculate-new-column-in-spark-dataframe-crossing-a-tokens-list-column-in-df1-wi解决了我的问题:
您可以先使用
join
usingarray_contains(MeaningfulWords,word)
,然后groupBy
从collect_list
他们所做的所有单词join
中执行此操作,然后使用高阶函数transform
并aggregate
获得平均分数(在 spark2.4+ 中有效)。高阶函数
aggregate
只接受整数值,因此必须transform
使用转换,最后除以 100(假设最多 2 个小数位,例如 2.81)。