Created
February 22, 2021 17:28
-
-
Save maziyarpanahi/0d67a7ee858da20ce94317358d0f5a2a to your computer and use it in GitHub Desktop.
Spark NLP + Spark ML Transformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Vivek Gupta Sep 2nd, 2020 at 10:02 AM | |
I am new to sparknlp. I am writing a custom transformer which will remove tokens from text whose length is <=2. Transformer is working and doing its job. But it is not giving proper structure as an output. Instead it is returning only Array of String. I am struggling to get output in following structure - | |
ArrayType( | |
StructType([ | |
StructField("annotatorType", StringType(), False), | |
StructField("begin", IntegerType(), False), | |
StructField("end", IntegerType(), False), | |
StructField("result", StringType(), False), | |
StructField("metadata", MapType(StringType(), StringType()), True) | |
]) | |
) | |
Currently I am getting following output - | |
--------------------------------------------------------------------------------+ | |
| modified_text| | |
+--------------------------------------------------------------------------------+ | |
|[person, agree, with, results, important, for, her| | |
|[pef, are, not, available, this, province, the, mainly... | |
Following is code for custom transformer - | |
from pyspark.ml import Pipeline, Transformer | |
from pyspark.ml.feature import Tokenizer, NGram | |
from typing import Iterable | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import col, explode, count | |
from pyspark.sql import DataFrame | |
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param | |
class ShortTokenRemover(Transformer, HasInputCol, HasOutputCol): | |
def __init__(self): | |
super(ShortTokenRemover, self).__init__() | |
def _transform(self, df: DataFrame) -> DataFrame: | |
def remove_token_short_length(inp): | |
inp = [token for token in inp if len(token) > 2] | |
return inp | |
udf_remove_token_short_length = udf(remove_token_short_length, ArrayType(StringType())) | |
out_col_name = self.getOutputCol() | |
in_col = df[self.getInputCol()] | |
df = df.withColumn(out_col_name, udf_remove_token_short_length(in_col.result)) | |
return df | |
(edited) | |
24 replies | |
Maziyar 6 months ago | |
The Tokenizer in Spark NLP comes with minLengh and maxLength parameters, no need to create a transformer since it will be pure Spark ML and you need to integrate it within Spark NLP pipeline which is not required: https://nlp.johnsnowlabs.com/api/#com.johnsnowlabs.nlp.annotators.Tokenizer | |
nlp.johnsnowlabs.comnlp.johnsnowlabs.com | |
Spark NLP 2.5.5 ScalaDoc | |
Spark NLP 2.5.5 ScalaDoc | |
Vivek Gupta 6 months ago | |
I have created transformer, its doing the job. When I have integrated this transformer in pipeline its giving following error - | |
Py4JJavaError: An error occurred while calling o1359.transform. | |
: java.lang.IllegalArgumentException: requirement failed: Wrong or missing inputCols annotators in StopWordsCleaner_61f3ca02188e. | |
Current inputCols: short_token_filtered. Dataset's columns: | |
(column_name=Text_en,is_nlp_annotator=false) | |
(column_name=Text_en_proc,is_nlp_annotator=false) | |
(column_name=Text_en_junk,is_nlp_annotator=false) | |
(column_name=Text_en_New,is_nlp_annotator=false) | |
(column_name=document,is_nlp_annotator=true,type=document) | |
(column_name=tokenized,is_nlp_annotator=true,type=token) | |
(column_name=lemmatized,is_nlp_annotator=true,type=token) | |
(column_name=short_token_filtered,is_nlp_annotator=false). | |
Make sure such annotators exist in your pipeline, with the right output names and that they have following annotator types: token | |
at scala.Predef$.require(Predef.scala:224) | |
I have created following pipeline - | |
pipeline = Pipeline() \ | |
.setStages([documentAssembler, | |
tokenizer, | |
lemmatizer, | |
short_token_filter, | |
stopwords_cleaner, | |
pos_tagger, | |
ngrammer, | |
finisher]) | |
I checked the transformer, It has annotator types: token | |
+--------------------------------------------------------------------------------+ | |
| short_token_filtered| | |
+--------------------------------------------------------------------------------+ | |
|[[token, 0, 5, doctor, [sentence -> 0], []], [token, 7, 11, agree, [sentence ...| | |
|[[token, 0, 9, spirometer, [sentence -> 0], []], [token, 13, 15, pef, [senten...| | |
+--------------------------------------------------------------------------------+ | |
I am not sure, why I am getting this error. | |
Maziyar 6 months ago | |
Could you please show your entire pipeline? Also, the schema of the dataframe? It may say token but it may not be AnnotatorType.TOKEN. | |
Vivek Gupta 6 months ago | |
Please find Df schema. Error was reported for transformer which produced 'short_token_filtered' field. | |
root | |
|-- Id: integer (nullable = true) | |
|-- Text_en: string (nullable = true) | |
|-- Text_en_proc: string (nullable = true) | |
|-- document: array (nullable = true) | |
| |-- element: struct (containsNull = true) | |
| | |-- annotatorType: string (nullable = true) | |
| | |-- begin: integer (nullable = false) | |
| | |-- end: integer (nullable = false) | |
| | |-- result: string (nullable = true) | |
| | |-- metadata: map (nullable = true) | |
| | | |-- key: string | |
| | | |-- value: string (valueContainsNull = true) | |
| | |-- embeddings: array (nullable = true) | |
| | | |-- element: float (containsNull = false) | |
|-- tokenized: array (nullable = true) | |
| |-- element: struct (containsNull = true) | |
| | |-- annotatorType: string (nullable = true) | |
| | |-- begin: integer (nullable = false) | |
| | |-- end: integer (nullable = false) | |
| | |-- result: string (nullable = true) | |
| | |-- metadata: map (nullable = true) | |
| | | |-- key: string | |
| | | |-- value: string (valueContainsNull = true) | |
| | |-- embeddings: array (nullable = true) | |
| | | |-- element: float (containsNull = false) | |
|-- short_token_filtered: array (nullable = true) | |
| |-- element: struct (containsNull = true) | |
| | |-- annotatorType: string (nullable = false) | |
| | |-- begin: integer (nullable = false) | |
| | |-- end: integer (nullable = false) | |
| | |-- result: string (nullable = false) | |
| | |-- metadata: map (nullable = true) | |
| | | |-- key: string | |
| | | |-- value: string (valueContainsNull = true) | |
| | |-- embeddings: array (nullable = true) | |
| | | |-- element: float (containsNull = true) | |
Vivek Gupta 6 months ago | |
To focus on specific issue, I have shortened pipeline, now pipeline is - | |
pipeline = Pipeline() \ | |
.setStages([documentAssembler, | |
tokenizer, | |
short_token_filter, | |
lemmatizer, | |
finisher]) | |
Maziyar 6 months ago | |
thanks, I meant the whole pipeline code not just the stages. I would like to see what are the inputs for each annotator | |
Vivek Gupta 6 months ago | |
from sparknlp.base import DocumentAssembler | |
text_col = 'Text_en_New' | |
documentAssembler = DocumentAssembler() \ | |
.setInputCol(text_col) \ | |
.setOutputCol('document') \ | |
.setCleanupMode("shrink") | |
from sparknlp.annotator import Tokenizer | |
tokenizer = Tokenizer() \ | |
.setInputCols(['document']) \ | |
.setOutputCol('tokenized') | |
short_token_filter = ShortTokenFilter() \ | |
.setInputCol('tokenized') \ | |
.setOutputCol('short_token_filtered') | |
from sparknlp.annotator import LemmatizerModel | |
lemmatizer = LemmatizerModel.pretrained() \ | |
.setInputCols(['short_token_filtered']) \ | |
.setOutputCol('lemmatized') | |
from sparknlp.base import Finisher | |
finisher = Finisher() \ | |
.setInputCols(['tokenized', 'lemmatized']) | |
from pyspark.ml import Pipeline | |
pipeline = Pipeline() \ | |
.setStages([documentAssembler, | |
tokenizer, | |
short_token_filter, | |
lemmatizer, | |
finisher]) (edited) | |
Maziyar 6 months ago | |
Your error is talking about StopWordsCleaner, I don't see that in your pipeline: | |
Py4JJavaError: An error occurred while calling o1359.transform. | |
: java.lang.IllegalArgumentException: requirement failed: Wrong or missing inputCols annotators in StopWordsCleaner_61f3ca02188e. | |
If you have this all in a working notebook I can give it a shot, it requires a detail evaluation of all the code to see what goes wrong | |
Vivek Gupta 6 months ago | |
Sharing error details. I have reduced the pipeline so that I can focus on error. Below is error details - | |
Py4JJavaError: An error occurred while calling o828.transform. | |
: java.lang.IllegalArgumentException: requirement failed: Wrong or missing inputCols annotators in LEMMATIZER_c62ad8f355f9. | |
Current inputCols: short_token_filtered. Dataset's columns: | |
(column_name=Text_en,is_nlp_annotator=false) | |
(column_name=Text_en_proc,is_nlp_annotator=false) | |
(column_name=Text_en_junk,is_nlp_annotator=false) | |
(column_name=Text_en_New,is_nlp_annotator=false) | |
(column_name=document,is_nlp_annotator=true,type=document) | |
(column_name=tokenized,is_nlp_annotator=true,type=token) | |
(column_name=short_token_filtered,is_nlp_annotator=false). | |
Make sure such annotators exist in your pipeline, with the right output names and that they have following annotator types: token | |
at scala.Predef$.require(Predef.scala:224) | |
at com.johnsnowlabs.nlp.AnnotatorModel._transform(AnnotatorModel.scala:43) | |
at com.johnsnowlabs.nlp.AnnotatorModel.transform(AnnotatorModel.scala:79) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) | |
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) | |
at py4j.Gateway.invoke(Gateway.java:295) | |
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) | |
at py4j.commands.CallCommand.execute(CallCommand.java:79) | |
at py4j.GatewayConnection.run(GatewayConnection.java:251) | |
at java.lang.Thread.run(Thread.java:748) | |
During handling of the above exception, another exception occurred: | |
IllegalArgumentException Traceback (most recent call last) | |
<command-4262523904952880> in <module> | |
----> 1 processed_review = pipeline.fit(review_text).transform(review_text) | |
/databricks/spark/python/pyspark/ml/base.py in transform(self, dataset, params) | |
171 return self.copy(params)._transform(dataset) | |
172 else: | |
--> 173 return self._transform(dataset) | |
174 else: | |
175 raise ValueError("Params must be a param map but got %s." % type(params)) | |
/databricks/spark/python/pyspark/ml/pipeline.py in _transform(self, dataset) | |
260 def _transform(self, dataset): | |
261 for t in self.stages: | |
--> 262 dataset = t.transform(dataset) | |
263 return dataset | |
Maziyar 6 months ago | |
Thanks, I think this requires debugging, it's hard to say what goes wrong since the code and the transformer looks ok. Could you please provide an end to end notebook that produces this error? We can take a closer look at it | |
Vivek Gupta 6 months ago | |
Please find notebook attached. | |
Binary | |
jsl_poc-5.ipynb | |
23 kB Binary23 kB — Click to download | |
:+1: | |
1 | |
Maziyar 6 months ago | |
@Vivek Gupta We are workin on it, it's actually a missing annotatorType in the metadata which should be added by PySpark itself and the error also indicates that. I'll let you know once we found the line to be added to your schema in the transformer | |
:+1: | |
1 | |
Vivek Gupta 6 months ago | |
Thanks. Looking forward for a solution. | |
Vivek Gupta 5 months ago | |
Hi @Maziyar, | |
can you help to resolve this issue? | |
Maziyar 5 months ago | |
Yes, sorry for the delay. Right after today's release I will sit and see what is that metadata schema from Spark ML | |
:question: | |
1 | |
Vivek Gupta 5 months ago | |
Hi @Maziyar, | |
can you help to resolve this issue? (edited) | |
Andres Fernandez 5 months ago | |
hi @Vivek Gupta, @Maziyar, @Jiri Dobes please find the way to make a transformer compatible with SparkNLP. Please pay attention to the withMeta free function and the meta variable (you would need to use valid SparkNLP column metatypes https://github.com/JohnSnowLabs/spark-nlp/blob/master/src/main/scala/com/johnsnowlabs/nlp/AnnotatorType.scala) | |
src/main/scala/com/johnsnowlabs/nlp/AnnotatorType.scala | |
package com.johnsnowlabs.nlp | |
object AnnotatorType { | |
val DOCUMENT = "document" | |
val TOKEN = "token" | |
Show more | |
<https://github.com/JohnSnowLabs/spark-nlp|JohnSnowLabs/spark-nlp>JohnSnowLabs/spark-nlp | Added by GitHub | |
Added to your saved items | |
Andres Fernandez 5 months ago | |
from pyspark.ml import Pipeline, Transformer | |
from pyspark.ml.feature import Tokenizer, NGram | |
from typing import Iterable | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import col, explode, count, udf | |
from pyspark.sql import DataFrame, Column | |
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param | |
from pyspark.ml import PipelineModel | |
import json | |
from sparknlp.annotator import * | |
def withMeta(self, alias, meta): | |
sc = SparkContext._active_spark_context | |
jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata | |
return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta)))) | |
dataType = StructType([ | |
StructField('annotatorType', StringType(), False), | |
StructField('begin', IntegerType(), False), | |
StructField('end', IntegerType(), False), | |
StructField('result', StringType(), False), | |
StructField('metadata', MapType(StringType(), StringType()), False), | |
StructField('embeddings', ArrayType(FloatType()), False) | |
]) | |
class ShortTokenRemover(Transformer, HasInputCol, HasOutputCol): | |
def __init__(self): | |
super(ShortTokenRemover, self).__init__() | |
def _transform(self, df: DataFrame) -> DataFrame: | |
def remove_token_short_length(inp): | |
inp = [token for token in inp if len(token.result) > 4] | |
return inp | |
udf_remove_token_short_length = udf(remove_token_short_length, ArrayType(dataType)) | |
out_col_name = self.getOutputCol() | |
in_col = df[self.getInputCol()] | |
Column.withMeta = withMeta | |
meta = {"annotatorType": "token"} | |
df = df.withColumn(out_col_name, udf_remove_token_short_length(in_col).withMeta("", meta)) | |
return df | |
(edited) | |
:+1: | |
1 | |
Vivek Gupta 5 months ago | |
@Andres Fernandez Thanks for providing solution. (edited) | |
Vivek Gupta 5 months ago | |
@Andres Fernandez | |
sc = SparkContext._active_spark_context | |
In above statement, from where I can get SparkContext? | |
Andres Fernandez 5 months ago | |
depends on how you start the session, is basically what you get returned when you start it | |
Andres Fernandez 5 months ago | |
spark = SparkSession.builder \ | |
.appName("Spark NLP Licensed") \ | |
.master("local[*]") \ | |
. . . | |
.getOrCreate() | |
(edited) | |
:+1: | |
1 | |
Added to your saved items | |
Vivek Gupta 5 months ago | |
@Andres Fernandez | |
Transformer is working as expected now, Pipeline looks like as below - | |
pipeline = Pipeline() \ | |
.setStages([documentAssembler, | |
tokenizer, | |
short_token_remover, | |
lemmatizer, | |
finisher]) | |
Output of 'short_token_filtered' is not tokens, rather its object. | |
Please let me know, how to get tokens rather object? | |
+--------------------+--------------------+--------------------+-----------------------------+--------------------+ | |
| Text|short_token_filtered| finished_tokenized|finished_short_token_filtered| finished_lemmatized| | |
+--------------------+--------------------+--------------------+-----------------------------+--------------------+ | |
|The smartphone in...|[[token, 4, 13, s...|[The, smartphone,...| [smartphone, indu...|[smartphone, indu...| | |
|The market has be...|[[token, 4, 9, ma...|[The, market, has...| [market, rumours,...|[market, rumour, ...| | |
|With the M-series...|[[token, 9, 16, M...|[With, the, M-ser...| [M-series, Samsun...|[M-series, Samsun...| | |
|Word on the stree...|[[token, 12, 17, ...|[Word, on, the, s...| [street, #Meanest...|[street, #Meanest...| | |
+--------------------+--------------------+--------------------+-----------------------------+--------------------+ | |
Andres Fernandez 5 months ago | |
yeah just using spark sql like | |
df.selectExpr("finished_lemmatized.result").show() | |
:+1: | |
1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment