Skip to content

Commit

Permalink
Merge pull request microsoft#175 from tawan0109/dataframe
Browse files Browse the repository at this point in the history
DataFrame API Na(), Read(), JsonFile() and Map()
  • Loading branch information
tawan0109 committed Dec 23, 2015
2 parents 42ae9e3 + 10346d3 commit fe55959
Showing 23 changed files with 1,721 additions and 33 deletions.
6 changes: 6 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Original file line number Diff line number Diff line change
@@ -84,10 +84,14 @@
<Compile Include="Interop\Ipc\PayloadHelper.cs" />
<Compile Include="Interop\Ipc\SerDe.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Proxy\IDataFrameNaFunctionsProxy.cs" />
<Compile Include="Proxy\IDataFrameProxy.cs" />
<Compile Include="Proxy\IDataFrameReaderProxy.cs" />
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
<Compile Include="Proxy\IDStreamProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameNaFunctionsIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameReaderIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
@@ -112,6 +116,8 @@
<Compile Include="Services\LoggerServiceFactory.cs" />
<Compile Include="Sql\Column.cs" />
<Compile Include="Sql\DataFrame.cs" />
<Compile Include="Sql\DataFrameNaFunctions.cs" />
<Compile Include="Sql\DataFrameReader.cs" />
<Compile Include="Sql\DataFrameWriter.cs" />
<Compile Include="Sql\PythonSerDe.cs" />
<Compile Include="Sql\RowConstructor.cs" />
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Spark.CSharp.Proxy
{
interface IDataFrameNaFunctionsProxy
{
IDataFrameProxy Drop(int minNonNulls, string[] cols);
IDataFrameProxy Fill(double value, string[] cols);
IDataFrameProxy Fill(string value, string[] cols);
IDataFrameProxy Fill(Dictionary<string, object> valueMap);
IDataFrameProxy Replace<T>(string col, Dictionary<T, T> replacement);
IDataFrameProxy Replace<T>(string[] cols, Dictionary<T, T> replacement);
}
}
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ internal interface IDataFrameProxy
IDataFrameProxy UnionAll(IDataFrameProxy otherScalaDataFrameReference);
IDataFrameProxy Subtract(IDataFrameProxy otherScalaDataFrameReference);
IDataFrameProxy Drop(string columnName);
IDataFrameProxy DropNa(int? thresh, string[] subset);
IDataFrameNaFunctionsProxy Na();
IDataFrameProxy DropDuplicates();
IDataFrameProxy DropDuplicates(string[] subset);
IDataFrameProxy Replace<T>(object subset, Dictionary<T, T> toReplaceAndValueDict);
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Sql;

namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IDataFrameReaderProxy
{
void Format(string source);
void Schema(StructType schema);
void Options(Dictionary<string,string> options);
IDataFrameProxy Load();
IDataFrameProxy Jdbc(string url, string table, string[] predicates, Dictionary<string,string> connectionProperties);
IDataFrameProxy Jdbc(string url, string table, Dictionary<String, String> properties);
IDataFrameProxy Jdbc(string url, string table, string columnName, string lowerBound, string upperBound,
int numPartitions, Dictionary<String, String> connectionProperties);
IDataFrameProxy Parquet(string[] paths);
IDataFrameProxy Table(string tableName);
}
}
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface ISqlContextProxy
{
IDataFrameReaderProxy Read();
IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy);
IDataFrameProxy ReadDataFrame(string path, StructType schema, Dictionary<string, string> options);
IDataFrameProxy JsonFile(string path);
Original file line number Diff line number Diff line change
@@ -334,18 +334,12 @@ public IDataFrameProxy Drop(string columnName)
}

/// <summary>
/// Call https://github.com/apache/spark/blob/branch-1.4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala, drop(how: String, cols: Seq[String])
/// Call https://github.com/apache/spark/blob/branch-1.4/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala, na(): DataFrame
/// </summary>
/// <param name="thresh"></param>
/// <param name="subset"></param>
/// <returns></returns>
public IDataFrameProxy DropNa(int? thresh, string[] subset)
public IDataFrameNaFunctionsProxy Na()
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
na.JvmReference, "drop",
new object[] { thresh, subset }).ToString()), sqlContextProxy);
return new DataFrameNaFunctionsIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
this.jvmDataFrameReference, "na")), sqlContextProxy);
}

/// <summary>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Spark.CSharp.Interop.Ipc;

namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class DataFrameNaFunctionsIpcProxy : IDataFrameNaFunctionsProxy
{
private readonly JvmObjectReference jvmDataFrameNaFunctionsReference;
private readonly ISqlContextProxy sqlContextProxy;

internal DataFrameNaFunctionsIpcProxy(JvmObjectReference jvmDataFrameNaFunctionsReference, ISqlContextProxy sqlContextProxy)
{
this.jvmDataFrameNaFunctionsReference = jvmDataFrameNaFunctionsReference;
this.sqlContextProxy = sqlContextProxy;
}

public IDataFrameProxy Drop(int minNonNulls, string[] cols)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "drop", minNonNulls, cols).ToString()), sqlContextProxy);
}

public IDataFrameProxy Fill(double value, string[] cols)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "fill", value, cols).ToString()), sqlContextProxy);
}

public IDataFrameProxy Fill(string value, string[] cols)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "fill", value, cols).ToString()), sqlContextProxy);
}

public IDataFrameProxy Fill(Dictionary<string, object> valueMap)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "fill", valueMap).ToString()), sqlContextProxy);
}

public IDataFrameProxy Replace<T>(string col, Dictionary<T, T> replacement)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "replace", col, replacement).ToString()), sqlContextProxy);
}

public IDataFrameProxy Replace<T>(string[] cols, Dictionary<T, T> replacement)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameNaFunctionsReference, "replace", cols, replacement).ToString()), sqlContextProxy);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Sql;

namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class DataFrameReaderIpcProxy : IDataFrameReaderProxy
{
private readonly JvmObjectReference jvmDataFrameReaderReference;
private readonly ISqlContextProxy sqlContextProxy;

internal DataFrameReaderIpcProxy(JvmObjectReference jvmDataFrameReaderReference, ISqlContextProxy sqlContextProxy)
{
this.jvmDataFrameReaderReference = jvmDataFrameReaderReference;
this.sqlContextProxy = sqlContextProxy;
}

public void Format(string source)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReaderReference, "format", new object[] { source });
}

public void Schema(StructType schema)
{
var structTypeIpcProxy = schema.StructTypeProxy as StructTypeIpcProxy;
if (structTypeIpcProxy != null)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReaderReference, "schema",
new object[] { structTypeIpcProxy.JvmStructTypeReference });
}

public void Options(Dictionary<string, string> options)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReaderReference, "options", new object[] { options });
}

public IDataFrameProxy Load()
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReaderReference, "load").ToString()), sqlContextProxy);
}

public IDataFrameProxy Jdbc(string url, string table, string[] predicates, Dictionary<string, string> connectionProperties)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReaderReference, "jdbc", new object[] { url, table, predicates, connectionProperties }).ToString()), sqlContextProxy);
}

public IDataFrameProxy Jdbc(string url, string table, Dictionary<string, string> properties)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReaderReference, "jdbc", new object[] { url, table, properties }).ToString()), sqlContextProxy);
}

public IDataFrameProxy Jdbc(string url, string table, string columnName, string lowerBound, string upperBound, int numPartitions, Dictionary<string, string> connectionProperties)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReaderReference, "jdbc", new object[] { url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties }).ToString()),
sqlContextProxy);
}

public IDataFrameProxy Parquet(string[] paths)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReaderReference, "parquet", new object[] { paths }).ToString()), sqlContextProxy);
}

public IDataFrameProxy Table(string tableName)
{
return new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReaderReference, "table", new object[] { tableName }).ToString()), sqlContextProxy);
}
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,12 @@ public SqlContextIpcProxy(JvmObjectReference jvmSqlContextReference)
this.jvmSqlContextReference = jvmSqlContextReference;
}

public IDataFrameReaderProxy Read()
{
var javaDataFrameReaderReference = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "read");
return new DataFrameReaderIpcProxy(new JvmObjectReference(javaDataFrameReaderReference.ToString()), this);
}

public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy)
{
var rdd = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "byteArrayRDDToAnyArrayRDD",
14 changes: 14 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/Column.cs
Original file line number Diff line number Diff line change
@@ -185,5 +185,19 @@ public Column Alias(string[] aliases)
{
return new Column(columnProxy.InvokeMethod("as", new object[] { aliases }));
}

/// <summary>
/// Casts the column to a different data type, using the canonical string representation
/// of the type. The supported types are: `string`, `boolean`, `byte`, `short`, `int`, `long`,
/// `float`, `double`, `decimal`, `date`, `timestamp`.
///
/// E.g.
/// // Casts colA to integer.
/// df.select(df("colA").cast("int"))
/// </summary>
public Column Cast(string to)
{
return new Column(columnProxy.InvokeMethod("cast", new object[] { to }));
}
}
}
Loading

0 comments on commit fe55959

Please sign in to comment.