Mobius adds C# language binding to Apache Spark, enabling the implementation of Spark driver code and data processing operations in C#.
For example, the word count sample in Apache Spark can be implemented in C# as follows :
var lines = sparkContext.TextFile(@"hdfs://path/to/input.txt");
var words = lines.FlatMap(s => s.Split(' '));
var wordCounts = words.Map(w => new KeyValuePair<string, int>(w.Trim(), 1))
.ReduceByKey((x, y) => x + y);
var wordCountCollection = wordCounts.Collect();
wordCounts.SaveAsTextFile(@"hdfs://path/to/wordcount.txt");
A simple DataFrame application using TempTable may look like the following:
var reqDataFrame = sqlContext.TextFile(@"hdfs://path/to/requests.csv");
var metricDataFrame = sqlContext.TextFile(@"hdfs://path/to/metrics.csv");
reqDataFrame.RegisterTempTable("requests");
metricDataFrame.RegisterTempTable("metrics");
// C0 - guid in requests DataFrame, C3 - guid in metrics DataFrame
var joinDataFrame = GetSqlContext().Sql(
"SELECT joinedtable.datacenter" +
", MAX(joinedtable.latency) maxlatency" +
", AVG(joinedtable.latency) avglatency " +
"FROM (" +
"SELECT a.C1 as datacenter, b.C6 as latency " +
"FROM requests a JOIN metrics b ON a.C0 = b.C3) joinedtable " +
"GROUP BY datacenter");
joinDataFrame.ShowSchema();
joinDataFrame.Show();
A simple DataFrame application using DataFrame DSL may look like the following:
// C0 - guid, C1 - datacenter
var reqDataFrame = sqlContext.TextFile(@"hdfs://path/to/requests.csv")
.Select("C0", "C1");
// C3 - guid, C6 - latency
var metricDataFrame = sqlContext.TextFile(@"hdfs://path/to/metrics.csv", ",", false, true)
.Select("C3", "C6"); //override delimiter, hasHeader & inferSchema
var joinDataFrame = reqDataFrame.Join(metricDataFrame, reqDataFrame["C0"] == metricDataFrame["C3"])
.GroupBy("C1");
var maxLatencyByDcDataFrame = joinDataFrame.Agg(new Dictionary<string, string> { { "C6", "max" } });
maxLatencyByDcDataFrame.ShowSchema();
maxLatencyByDcDataFrame.Show();
A simple Spark Streaming application that processes messages from Kafka using C# may be implemented using the following code:
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath, () =>
{
var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
ssc.Checkpoint(checkpointPath);
var stream = KafkaUtils.CreateDirectStream(ssc, topicList, kafkaParams, perTopicPartitionKafkaOffsets);
//message format: [timestamp],[loglevel],[logmessage]
var countByLogLevelAndTime = stream
.Map(kvp => Encoding.UTF8.GetString(kvp.Value))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(
string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y,
windowDurationInSecs, slideDurationInSecs, 3)
.Map(logLevelCountPair => string.Format("{0},{1}",
logLevelCountPair.Key, logLevelCountPair.Value));
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
foreach (var logCount in countByLogLevel.Collect())
Console.WriteLine(logCount);
});
return ssc;
});
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();
Refer to Mobius\csharp\Samples directory and sample usage for complete samples.
Refer to Mobius C# API documentation for the list of Spark's data processing operations supported in Mobius.
Mobius API usage samples are available at:
-
Examples folder which contains standalone C# projects that can be used as templates to start developing Mobius applications
-
Samples project which uses a comprehensive set of Mobius APIs to implement samples that are also used for functional validation of APIs
-
Mobius performance test scenarios implemented in C# and Scala for side by side comparison of Spark driver code
Refer to the docs folder for design overview and other info on Mobius
Ubuntu 14.04.3 LTS | Windows | Unit test coverage |
---|---|---|
Windows | Linux | |
---|---|---|
Build & run unit tests | Build in Windows | Build in Linux |
Run samples (functional tests) in local mode | Samples in Windows | Samples in Linux |
Run examples in local mode | Examples in Windows | Examples in Linux |
Run Mobius app |
Mobius is built and tested with Apache Spark 1.4.1, 1.5.2 and 1.6.*.
Mobius releases are available at https://github.com/Microsoft/Mobius/releases. References needed to build C# Spark driver applicaiton using Mobius are also available in NuGet
Refer to mobius-release-info.md for the details on versioning policy and the contents of the release.
Mobius is licensed under the MIT license. See LICENSE file for full license information.
[](https://twitter.com/intent/tweet?text=@MobiusForSpark [your tweet] via @GitHub)
-
Mobius project welcomes contributions. To contribute, follow the instructions in CONTRIBUTING.md
-
Options to ask your question to the Mobius community
- create issue on GitHub
- create post with "sparkclr" tag in Stack Overflow
- join chat at Mobius room in Gitter
- tweet @MobiusForSpark
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.