- Drupal 8 Development in PHP
- Migration Tutorials for Drupal 8 (from Drupal 7 primarily or other systems)
- Technical Notes for Config of Drupal 7
- For Ubuntu 16 Setup Notes for Web Development System
Technical notes about past publications and work by Darrell Ulm including Apache Spark, software development work, computer programming, Parallel Computing, Algorithms, Koha, and Drupal. Source code snippets, like in Python for Spark. Retrospective of projects.
Catch up on Drupal and Ubuntu Linux Posts
Getting back into parallel computing with Apache Spark
Returning to parallel computing with Apache Spark has been insightful, especially observing the increasing mainstream adoption of the McColl and Valiant BSP (Bulk Synchronous Parallel) model beyond GPUs. This structured approach to parallel computation, with its emphasis on synchronized supersteps, offers a practical framework for diverse parallel architectures.While setting up Spark on clusters can involve effort and introduce overhead, ongoing optimizations are expected to enhance its efficiency over time. Improvements in data handling, memory management, and query execution aim to streamline parallel processing.A GitHub repository for Spark snippets has been created as a resource for practical examples. As Apache Spark continues to evolve in parallel with the HDFS (Hadoop Distributed File System), this repository intends to showcase solutions leveraging their combined strengths for scalable data processing.
Scala Version of Approximation Algorithm for Knapsack Problem for Apache Spark
I ran this on a local setup, so it may require modification if you are using something like a Databricks environment. Also you will likely need to setup your Scala environment.
All the code for this is at GitHub
First, let's import all the libraries we need.
import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.sumWe'll define this object knapsack, although it could be more specific for what this is doing, it's good enough for this simple test.
object knapsack {
Again, we'll define the knapsack approximation algorithm, expecting a dataframe with the profits and weights, as well as W, a total weight.
def knapsackApprox(knapsackDF: DataFrame, W: Double): DataFrame = {
Calculate the ratios of profit over weight, and sort them high to low ratio. Discard any weights that are already larger than the max knapsack size, W.
val ratioDF = knapsackDF.withColumn("ratio", knapsackDF("values") / knapsackDF("weights"))
val newRatioDF = (ratioDF
.filter(ratioDF("weights") <= W)
.sort(ratioDF("ratio").desc)
)
Now we'll use SQL to add up all the partial sums of weights. A window function is another way this could work with SQL. This will tell us what can fit in the knapsack, and remember these are sorted by profit to weight ratio, high to low.
newRatioDF.createOrReplaceTempView("tempTable")
val partialSumWeightsDF = spark.sql("SELECT item, weights, values, ratio, sum(weights) OVER (ORDER BY ratio desc) as partSumWeights FROM tempTable")
val partialSumWeightsFilteredDF = (
partialSumWeightsDF
.filter(partialSumWeightsDF("partSumWeights") <= W)
)
And now return this new Dataframe, which will have only the objects that fit.partialSumWeightsDF } }So this will return the greedy solution, which is fast and easy use parallelism, but is not optimal. Parallel solutions to optimal knapsack algorithms, are often not as simple, but this was a good way to test out Spark using Scala.
And here is the test code, which is pretty self explanatory, the Github is some work in progress and I've some clean up to do.
import org.apache.spark.mllib.random.RandomRDDs._
import scala.collection.mutable.ListBuffer<- ------------------------------------------="" -="" 0.3="" 0.6="" 10.0="" 1="" 5.="" a="" alue="" and="" approximate="" approximation="" call="" countresult="" create="" data:="" data="" dataframe.="" dataframe="" display="" eights="" elected="" elements:="" elements="" end="" find="" for="" function="" greedy="" item="" item_="" k.tostring="" knapresults.show="" knapresults="knapsack.knapsackApprox(knapsackData," knapsack.="" knapsack="" knapsackdata.show="" knapsackdata="sc.parallelize(knapsackDataList).toDF(" knapsackdatalist="knapsackDataListBuffer.toList" knapsackdatalistbuffer="" make="" maximum="" n="" of="" original="" ount:="" pre="" println="" r.nextdouble="" r="" random="" results="" riginal="" s="" selected="" show="" size="" start="" test="" the="" to="" total:="" totals.="" totals="" val="" value="" values="" valuesresult.show="" valuesresult="knapResults.agg(sum(" w="" weight.="" weight="" weights="" weightsresult.show="" weightsresult="knapResults.agg(sum(" with="">
import org.apache.spark.mllib.random.RandomRDDs._
import scala.collection.mutable.ListBuffer
// Knapsack problem size.
val N = 10
// Random
val r = scala.util.Random
// Setup sample data for knapsack.
val knapsackDataListBuffer = ListBuffer[(String, Double, Double)]()
for (k <- 1 to N) {
knapsackDataListBuffer += (("item_" + k.toString, r.nextDouble() * 10.0, r.nextDouble() * 10.0))
}
val knapsackDataList = knapsackDataListBuffer.toList
// Make a Dataframe with item(s), weight(s), and value(s) for the knapsack.
val knapsackData = sc.parallelize(knapsackDataList).toDF("item", "weights", "values")
// Display the original data
println("Original Data:")
knapsackData.show()
println("\r\n")
// Create a random maximum weight
val start = N * 0.3
val end = N * 0.6
val W = (math.random * (end - start) + start)
// Show the weight.
println("W: ")
println(W)
println("\r\n")
// Call the knapsack greedy approximation function, with data and size 5.
val knapResults = knapsack.knapsackApprox(knapsackData, W)
// Show the results Dataframe.
println("Selected Elements:")
knapResults.show()
println("\r\n")
// Find the totals.
val valuesResult = knapResults.agg(sum("values"))
val weightsResult = knapResults.agg(sum("weights"))
val countResult = knapResults.count()
// Show totals for selected elements of knapsack.
println("Value Total:")
valuesResult.show()
println("\r\n")
println("Weights Total:")
weightsResult.show()
println("\r\n")
println("Count:")
println(countResult)
println("\r\n")
->
And that is it, just create some random items, call the knapsackApprox(knapsackData, W) function, and print out the results. Note, I summed it outside of the main knapsack routine, which just finds the objects that satisfy the problem. Next tasks are: clean up the code for Scala, convert to window function, and complete the Java version.Apache Spark Knapsack Approximation Algorithm in Python
The Github code repo. for the Knapsack approximation algorithms is here, and it includes a Scala solution. The work on a Java version is in progress at time of this writing.
Below we have the code that computes the solution that fits within the knapsack W for a set of items each with it's own weight and profit value. We look to maximize the final sum of selected items profits while not exceeding the total possible weight, W.
First we import some spark libraries into Python.
# Knapsack 0-1 function weights, values and size-capacity. from pyspark.sql import SparkSession from pyspark.sql.functions import lit from pyspark.sql.functions import col from pyspark.sql.functions import sum
Now define the function, which will take a Spark Dataframe with all the items, each with a name, weight and profit, and the global total weight.
def knapsackApprox(knapsackDF, W):
'''
A simple greedy parallel implementation of 0-1 Knapsack algorithm.
Parameters
----------
knapsackDF : Spark Dataframe with knapsack data
sqlContext.createDataFrame(knapsackData, ['item', 'weights', 'values'])
W : float
Total weight allowed for knapsack.
Returns
-------
Dataframe
Dataframe with results.
'''
Create a DataFrame for each item, that is the ratio of profit over weight. Disregard all items weights that are larger than the global knapsack Weight W since they will not fit regardless. Sort, in Spark, all item rows by the ratio value, high to low.
# Add ratio of values / weights column.
ratioDF = (knapsackDF.withColumn("ratio", lit(knapsackDF.values / knapsackDF.weights))
.filter(col("weights") <= W)
.sort(col("ratio").desc())
)
Get a current Spark Session to feed to the sql command.
# Get the current Spark Session.
sc = SparkSession.builder.getOrCreate()
Now we take the sorted list of filtered items, with profit ratio, and compute a partial sums of weight. This adds each element's weight to the prior sum of weight, which will be how to select the items that are less than W. There are other ways to do this, used the sql with sum() below.
# An sql method to calculate the partial sums of the ratios.
ratioDF.registerTempTable("tempTable")
partialSumWeightsDF = sc.sql("""
SELECT
item,
weights,
values,
ratio,
sum(weights) OVER (ORDER BY ratio desc) as partSumWeights
FROM
tempTable
""")
Now that we have the partial sums of weights when the DataFrame of elements is sorted by profit weight ratio, we get only the items that have a partial sum of weights less than the knapsack size W, and this is the final solution set.
# Get the max number of items, less than or equal to W in Spark.
partialSumWeightsFilteredDF = (
partialSumWeightsDF.sort(col("ratio").desc())
.filter(col("partSumWeights") <= W)
)
Return the partialSumWeightsFilteredDF as the solution set.
# Return the solution elements with total values, weights and count.
return partialSumWeightsFilteredDF
For a non-approximate, optimal, solution's algorithm would need to be much differently, and Bulk Synchronous Parallel (BSP) solutions would be a good place to start looking.
Testing:
Now to use this function, below is some code to plug in values and test the knapsack approximation solution.
# --------------------------------------------
# Test the Approximate Knapsack function test
# --------------------------------------------
# Pull in the knapsack library.
import random
from pyspark.sql import SparkSession
from knapsack import knapsack
# Create the SparkContext.
sc = SparkSession \
.builder \
.appName("Knapsack Approximation Algorithm Test") \
.getOrCreate()
Set the problem size to 10.# Knapsack problem size. N = 10
Setup a Python list with some uniform random data for N items in
# Setup sample data for knapsack.
knapsackData = [('item_' + str(k), random.uniform(1.0, 10.0), random.uniform(1.0, 10.0)) for k in range(N)]
Now create the knapsack items, with column names, item, weights and values, using the list KnapsackData.
# Make a Dataframe with item(s), weight(s), and value(s) for the knapsack. knapsackData = sc.createDataFrame(knapsackData, ['item', 'weights', 'values']) # Display the original data print "Original Data:" print knapsackData.show() print "\n"
The weight is random based on a range of the number of items N, and we could set it in a number of ways.
# Create a random maximum weight W = random.uniform(N * 1.3, N * 1.6) # Show the weight. print "W: " print W print "\n"
Now we just call the knapsack approximation method passing the knapsackData and total knapsack size W.
# Call the knapsack greedy approximation function, with data and size 5. k = knapsack.knapsackApprox(knapsackData, W)
That's it, so we show the results of the selected elements:
# Show the results Dataframe. print "Selected Elements:" print k.show() print "\n"
And add up the totals via RDD lambda function map() method to get the sum of values and weights. The sum of values is the total knapsack profit.
# Show totals for selected elements of knapsack. sumValues = k.rdd.map(lambda x: x["values"]).reduce(lambda x, y: x+y) sumWeights = k.rdd.map(lambda x: x["weights"]).reduce(lambda x, y: x+y) numResults = k.count() print "Totals:" print "Sum Values: ", sumValues print "Sum Weights: ", sumWeights print numResults print "\n" # ------------------------------------------ # End of Approximate Knapsack function test # ------------------------------------------
That is all. The Scala version is very similar to this, and I did not yet test this for very large N to see how long it takes. The sorts may have performance implications as well as the method to take the partial sums. Tests showed it to work correctly.
The code here can also be found at: Spark.Packages.Org for the knapsack approximation.
A way to Merge Columns of DataFrames in Spark with no Common Column Key
Once the IDs are added, a DataFrame join will merge all the columns into one Dataframe.