Email Spam Identification, category classification of news and organization of web pages by search engines are the modern world examples for document classification. It is a technique to systematically classify a text document in one of the fixed category, or In other words, tagging of text document can be described as document classification process. This technique is really helpful when the amount of data is too large, specially for organizing, information filtering and storage purposes.
In this article, we will discuss an approach to implement an end to end document classification pipeline using Apache Spark, and we will use Scala as the core programming language. Apache Spark is the ideal choice while dealing with a greater volume and variety of data. Apache Spark’s machine learning library - Mllib is scalable, easy to deploy and is hundred times faster than mapreduce operations.
Table of Contents
1. Document Pre Processing
2. Initializing Apache Spark
3. Dataset Preparation
4. Creating the MLlib pipeline
5. Training the model
6. Prediction on Test Data
7. Conclusion and Next Steps
1. Document Pre Processing
The first component of the pipeline in is pre-processing block which involves removal of noisy content from the document. This included cleaning of URLs, punctuations, digits, short words, extra whitespace and english stopwords etc. Below are the scala utility functions used for cleaning various regular expressions and custom words.
``` \\ Utility function to remove particular regex from text def removeRegex(txt: String, flag: String): String = { val regex = RegexList.get(flag) var cleaned = txt regex match { case Some(value) => if (value.equals("white_space")) cleaned = txt.replaceAll(value, "") else cleaned = txt.replaceAll(value, " ") case None => println("No regex flag matched") } cleaned } \\ Particular function to remove stopwords from text def removeCustomWords(txt: String, flag: String): String ={ var words = txt.split(" ") val stopwords = Stopwords.get(flag) stopwords match { case Some(value) => words = words.filter(x => !value.contains(x)) case None => println("No stopword flag matched") } words.mkString(" ") } ```
To use these functions, next step is to create the regular expressions and cleaning the entire documents step by step.
``` \\ Building a List of Regex for PreProcessing the text var RegexList = Map[String, String]() RegexList += ("punctuation" -> "[^a-zA-Z0-9]") RegexList += ("digits" -> "\\b\\d+\\b") RegexList += ("white_space" -> "\\s+") RegexList += ("small_words" -> "\\b[a-zA-Z0-9]{1,2}\\b") RegexList += ("urls" -> "(https?\\://)\\S+") \\ Loading a stopwords list var Stopwords = Map[String, List[String]]() Stopwords += ("english" -> Source.fromFile("stopwords.txt").getLines().toList) \\ Function to perform step by step text preprocessing and cleaning on documents def cleanDocument(document_text: String) : String = { \\ Converting all words to lowercase \\ Removing URLs from document \\ Removing Punctuations from document text \\ Removing Digits from document text \\ Removing all words with length less than or equal to 2 \\ Removing extra whitespaces from text \\ Removing English Stopwords \\ Returning the preprocessing and cleaned document text var text = document_text.toLowerCase text = removeRegex(text,"urls") text = removeRegex(text,"punctuation") text = removeRegex(text,"digits") text = removeRegex(text,"small_words") text = removeRegex(text,"white_space") text = removeCustomWords(text, "english") text } ```
2. Initializing Sparkcontext
To use Spark, we need to initialize it and create contexts to be used for training the classifiers, building the pipelines and making necessary transformations. Following lines of code can be used for this purpose.
``` val conf = new SparkConf().setMaster("local[*]").setAppName("DC") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) ```
3. Dataset Preparation - Loading the Documents
Now, we need to load the documents and create a dataframe using sql context and splitting it into test data and training dataframes. This will include reading the text file (containing the documents), creating file RDD to data frame and finally slicing the dataframe into training and test.
``` // Loading the text file using sc.textFile function and creating an RDD // RDD shape: “CleanedText”,Category” val input_path = "/path/to/data.txt" val input_RDD = sc.textFile(input_path).map(x => { val row = x.split(",") (cleanDocument(row(1)),row(2)) }) // Converting an RDD to DataFrame val trainingDF = sqlContext.createDataFrame(input_RDD) .toDF("id","cleaned","category") // Slicing the data into 70:30 ratio for training and testing data val Array(trainingData, testData) = trainingDF.randomSplit(Array(0.7, 0.3)) // print the training data trainingData.show() ```
4. Creating the MLlib PipeLine
In the next step, we will prepare the processing and classification pipeline using MLlib. This pipeline consists of: Indexer (to convert category names into Indexes) Tokenization (for converting text into tokens (words)) hashingTF (a term frequency matrix for every document. The role of term frequency is to act as features of every document. MLlib provides Hashing trick implementation) For classification component, we will use logistic regression. When the problem is multi class classification, we will wrap the model in one vs. rest model.
``` // Processing val indexer = new StringIndexer() .setInputCol("category") .setOutputCol("label") val tokenizer = new Tokenizer() .setInputCol("cleaned") .setOutputCol("tokens") val hashingTF = new HashingTF() .setInputCol("tokens").setOutputCol("features") .setNumFeatures(20) // Classification val lr = new LogisticRegression().setMaxIter(100).setRegParam(0.001) val ovr = new OneVsRest().setClassifier(lr) ```
5. Training The Model
Creating the final pipeline of all the components, and fitting the model on training data.
``` val pipeline = new Pipeline() .setStages(Array(indexer, tokenizer, hashingTF, ovr)) val model = pipeline.fit(trainingData) ```
6. Prediction on Test Data
Once the model is trained, it can be used for making predictions on test data. One can use Confusion Matrix or Cross Validation techniques in order to measure the accuracies of the pipeline.
``` // create the classification pipeline and train the model val prediction = model.transform(testData) .select("id","cleaned_text","category","prediction") // print the predictions prediction.foreach(println) ```
7. Conclusion and Next Steps
The full code of this tutorial can be found here, This tutorial explains about creating a pipeline for document classification in spark using scala. This end to end pipeline is capable for predicting the unknown classes of different text with decent accuracies. Next Steps, are obviously about improvement of each component involved in this pipeline. Refer to the official MLlib Link and Spark programming Guide for more detailed documentation. Feel free to share your thoughts in the comments.