Skip to content

Instantly share code, notes, and snippets.

@daniel-cortez-stevenson
Last active December 15, 2023 09:49
Show Gist options
  • Save daniel-cortez-stevenson/8758e4fe17cd81b819788119178badcd to your computer and use it in GitHub Desktop.
Save daniel-cortez-stevenson/8758e4fe17cd81b819788119178badcd to your computer and use it in GitHub Desktop.
Wrapping Scala with Python for PySpark Example (org.apache.spark.mllib.feature.Stemmer)
"""An example of wrapping a Scala UDF with Python code.
Copyright 2020 Daniel Cortez Stevenson
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from pyspark.ml.wrapper import JavaTransformer
from pyspark import keyword_only, since
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
class HasLanguage(Params):
"""Mixin for param language"""
language = Param(Params._dummy(), 'language', 'English or German, etc.', typeConverter=TypeConverters.toString)
def __init__(self):
super(HasLanguage, self).__init__()
def setLanguage(self, value):
return self._set(language=value)
def getLanguage(self):
return self.getOrDefault(self.language)
class SnowballStemmer(JavaTransformer, JavaMLReadable, JavaMLWritable, HasInputCol, HasOutputCol, HasLanguage):
"""Python-wrapped Scala Implementation of the Snowball Stemmer."""
@keyword_only
def __init__(self, inputCol=None, outputCol=None, language=None):
"""__init__(inputCol=None, outputCol=None, language=None)"""
super(SnowballStemmer, self).__init__()
self._java_obj = self._new_java_obj('org.apache.spark.mllib.feature.Stemmer', self.uid)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
@since('1.3.0')
def setParams(self, inputCol=None, outputCol=None, language=None):
"""setParams(inputCol=None, outputCol=None, language=None)"""
kwargs = self._input_kwargs
return self._set(**kwargs)
@daniel-cortez-stevenson
Copy link
Author

daniel-cortez-stevenson commented May 1, 2020

Usage

import logging

import pyspark.sql.types as T
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.pipeline import Pipeline

from snowball_stemmer import SnowballStemmer


logger = logging.getLogger(__name__)


def main(spark, **kwargs):
    logger.info('Creating a simple DataFrame ...')
    schema_names = ['id', 'german_text']
    fields = [T.StructField(field_name, T.StringType(), True) for field_name in schema_names]
    schema = T.StructType(fields)
    data = [
        ('abc', 'Hallo Herr Mustermann'),
        ('xyz', 'Deutsch ist das Ding!'),
    ]
    df = spark.createDataFrame(data, schema)
    df.show()

    logger.info('Building the ML pipeline ...')
    tokenizer = RegexTokenizer(inputCol='german_text', outputCol='tokens', pattern='\\s+')
    stemmer = SnowballStemmer(inputCol='tokens', outputCol='stemmed_tokens', language='German')
    stemming_pipeline = Pipeline(stages=[
        tokenizer,
        stemmer,
    ])

    logger.info('Running the stemming ML pipeline ...')
    stemmed_df = stemming_pipeline.fit(df).transform(df)
    stemmed_df.show()

Written for pyspark 2.4.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment