Skip to content

Commit

Permalink
rename delta-exchange to delta-sharing and clean up codes
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 10, 2021
1 parent 5658854 commit 45c4443
Show file tree
Hide file tree
Showing 57 changed files with 1,202 additions and 2,045 deletions.
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@


[![Build and Test](https://github.com/delta-io/delta-sharing/actions/workflows/build-and-test.yml/badge.svg)](https://github.com/delta-io/delta-sharing/actions/workflows/build-and-test.yml)

### Install the Spark connector

```
build/sbt spark/publishLocal
```

Note: Make sure deleting the following directory when changing the Spark connector. Spark's `--packages` will cache it.

```
rm -rf ~/.ivy2/cache/io.delta/delta-sharing-spark_2.12
```

### Install Python Pandas connector

```
cd python/
pip install -e .
```

Note: this installs the package in the current directory. Make sure staying in the `python` directory when testing it.

### Create a profile file.

Save the following content in a file.

```
{
"version": 1,
"endpoint": "https://ec2-18-237-148-30.us-west-2.compute.amazonaws.com/delta-sharing/",
"bearerToken": "dapi5e3574ec767ca1548ae5bbed1a2dc04d"
}
```

### Start PySpark

Make sure you are using PySpark 3.1.1 and still in the `python` directory. Adding `SPARK_LOCAL_IP` to fix the network issue in VPN.

```
SPARK_LOCAL_IP=127.0.0.1 pyspark --packages io.delta:delta-sharing-spark_2.12:0.1.0-SNAPSHOT
```

### Try the folllowing codes

Make sure you connect to VPN. `load_as_pandas` doesn't require PySpark.

```
>>> from delta_sharing import DeltaSharing
>>> delta = DeltaSharing("<the-profile-file>")
>>> delta.list_all_tables()
[Table(name='table1', share='share1', schema='default'), Table(name='table3', share='share1', schema='default'), Table(name='table2', share='share2', schema='default')]
>>> DeltaSharing.load_as_pandas('<the-profile-file>#share1.default.table1')
eventTime date
0 2021-04-28 06:32:22.421 2021-04-28
1 2021-04-28 06:32:02.070 2021-04-28
>>> DeltaSharing.load_as_spark('<the-profile-file>#share1.default.table1').show()
+--------------------+----------+
| eventTime| date|
+--------------------+----------+
|2021-04-27 23:32:...|2021-04-28|
|2021-04-27 23:32:...|2021-04-28|
+--------------------+----------+
```

59 changes: 16 additions & 43 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ import ReleaseTransformations._
import sbt.ExclusionRule

parallelExecution in ThisBuild := false
crossScalaVersions in ThisBuild := Seq("2.12.8", "2.11.12")

lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
lazy val testScalastyle = taskKey[Unit]("testScalastyle")

val sparkVersion = "3.0.2"
val sparkVersion = "3.1.1"
val hadoopVersion = "2.7.2"
val deltaVersion = "0.5.0"

lazy val commonSettings = Seq(
organization := "io.delta",
scalaVersion := "2.12.8",
scalaVersion := "2.12.10",
fork := true,
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
scalacOptions += "-target:jvm-1.8",
Expand All @@ -41,7 +39,7 @@ lazy val commonSettings = Seq(
"-Dspark.sql.shuffle.partitions=5",
"-Ddelta.log.cacheSize=3",
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
"-Dspark.delta.exchange.client.sslTrustAll=true",
"-Dspark.delta.sharing.client.sslTrustAll=true",
"-Xmx1024m"
)
)
Expand All @@ -51,10 +49,10 @@ lazy val releaseSettings = Seq(
releaseCrossBuild := true,
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")),
pomExtra :=
<url>https://github.com/delta-io/delta-exchange</url>
<url>https://github.com/delta-io/delta-sharing</url>
<scm>
<url>git@github.com:delta-io/delta-exchange.git</url>
<connection>scm:git:git@github.com:delta-io/delta-exchange.git</connection>
<url>git@github.com:delta-io/delta-sharing.git</url>
<connection>scm:git:git@github.com:delta-io/delta-sharing.git</connection>
</scm>
<developers>
<developer>
Expand Down Expand Up @@ -114,21 +112,7 @@ releaseProcess := Seq[ReleaseStep](
)

lazy val root = (project in file("."))
.aggregate(client, spark, server)

lazy val client = (project in file("client")) settings(
name := "delta-exchange-client",
commonSettings,
releaseSettings,
libraryDependencies ++= Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.0",
"org.json4s" %% "json4s-jackson" % "3.6.6" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module")
),
"org.apache.httpcomponents" % "httpclient" % "4.5.13"
)
)
.aggregate(spark, server)

lazy val getInitialCommandsForConsole: Def.Initialize[String] = Def.settingDyn {
val base = """ println("Welcome to\n" +
Expand All @@ -145,7 +129,7 @@ lazy val getInitialCommandsForConsole: Def.Initialize[String] = Def.settingDyn {
| val conf = new org.apache.spark.SparkConf()
| .setMaster("local")
| .setAppName("Sbt console + Spark!")
| .set("spark.sql.extensions", "io.delta.exchange.sql.DeltaExchangeSparkSessionExtension")
| .set("spark.delta.sharing.client.sslTrustAll", "true")
| new org.apache.spark.SparkContext(conf)
|}
|sc.setLogLevel("WARN")
Expand All @@ -159,6 +143,8 @@ lazy val getInitialCommandsForConsole: Def.Initialize[String] = Def.settingDyn {
| println("SQL context available as sqlContext.")
| _sqlContext
|}
|val spark = sqlContext.sparkSession
|println("SparkSession available as spark.")
|import sqlContext.implicits._
|import sqlContext.sql
|import org.apache.spark.sql.functions._
Expand All @@ -169,37 +155,23 @@ lazy val getInitialCommandsForConsole: Def.Initialize[String] = Def.settingDyn {
}
}

lazy val spark = (project in file("spark")) enablePlugins(Antlr4Plugin) dependsOn(client) settings(
name := "delta-exchange-spark",
lazy val spark = (project in file("spark")) settings(
name := "delta-sharing-spark",
commonSettings,
releaseSettings,
antlr4Version in Antlr4 := "4.7",
antlr4PackageName in Antlr4 := Some("io.delta.exchange.sql.parser"),
antlr4GenListener in Antlr4 := true,
antlr4GenVisitor in Antlr4 := true,
initialCommands in console := getInitialCommandsForConsole.value,
cleanupCommands in console := "sc.stop()",
libraryDependencies ++= Seq(
"org.apache.parquet" % "parquet-hadoop" % "1.10.1" % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"io.delta" %% "delta-core" % "0.8.0" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",

"org.apache.hadoop" % "hadoop-client" % "2.10.1",
"org.apache.hadoop" % "hadoop-common" % "2.10.1",
"org.apache.hadoop" % "hadoop-aws" % "2.10.1",
"org.apache.hadoop" % "hadoop-azure" % "2.10.1",

"com.google.cloud" % "google-cloud-storage" % "1.113.14",
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-2.2.0"
"org.scalatest" %% "scalatest" % "3.2.3" % "test"
)
)

lazy val server = (project in file("server")) settings(
name := "delta-exchange-server",
name := "delta-sharing-server",
commonSettings,
releaseSettings,
libraryDependencies ++= Seq(
Expand Down Expand Up @@ -253,7 +225,8 @@ lazy val server = (project in file("server")) settings(
),
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"net.sourceforge.argparse4j" % "argparse4j" % "0.9.0"
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
Expand Down
12 changes: 0 additions & 12 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
* limitations under the License.
*/


resolvers += Resolver.url("artifactory", url("https://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"

resolvers += "Typesafe Repository" at "https://repo.typesafe.com/typesafe/releases/"

resolvers += Resolver.url(
Expand All @@ -29,19 +26,10 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")

addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.6")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")

addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.2")

addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.2")

addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.1"

28 changes: 11 additions & 17 deletions python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
from itertools import chain
from typing import BinaryIO, Optional, Sequence, TextIO, Union
from typing import BinaryIO, Sequence, TextIO, Union
from pathlib import Path
from urllib.parse import urlparse

Expand Down Expand Up @@ -46,22 +46,6 @@ def list_all_tables(self) -> Sequence[Table]:
schemas = chain(*(self.list_schemas(share) for share in shares))
return list(chain(*(self.list_tables(schema) for schema in schemas)))

def load_as_pandas(
self,
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None
) -> pd.DataFrame:
reader = DeltaSharingReader(table=table, rest_client=self._rest_client)

if predicateHints is not None:
reader = reader.predicateHints(predicateHints)
if limitHint is not None:
reader = reader.limitHint(limitHint)

return reader.to_pandas()

@staticmethod
def load(url: str) -> DeltaSharingReader:
profile_json = url.split("#")[0]
Expand All @@ -77,3 +61,13 @@ def load(url: str) -> DeltaSharingReader:
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
)

@staticmethod
def load_as_pandas(url: str) -> pd.DataFrame:
return DeltaSharing.load(url).to_pandas()

@staticmethod
def load_as_spark(url: str):
from pyspark.sql import SparkSession

return SparkSession.getActiveSession().read.format("deltaSharing").load(url)
2 changes: 1 addition & 1 deletion python/delta_sharing/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def read_from_file(profile: Union[str, IO, Path]) -> "ShareProfile":
def from_json(json) -> "ShareProfile":
if isinstance(json, (str, bytes, bytearray)):
json = loads(json)
return ShareProfile(endpoint=json["endpoint"], token=json["token"])
return ShareProfile(endpoint=json["endpoint"], token=json["bearerToken"])


@dataclass(frozen=True)
Expand Down
4 changes: 3 additions & 1 deletion python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def __init__(self, profile: ShareProfile):
self._session.headers.update({"Authorization": f"Bearer {profile.token}"})
if urlparse(profile.endpoint).netloc in ("localhost", "localhost:443"):
self._session.verify = False
# TODO Remove this. This is added for demo.
self._session.verify = False

def list_shares(
self, *, max_results: Optional[int] = None, page_token: Optional[str] = None
Expand Down Expand Up @@ -141,7 +143,7 @@ def list_files_in_table(
return ListFilesInTableResponse(
protocol=Protocol.from_json(protocol_json["protocol"]),
metadata=Metadata.from_json(metadata_json["metaData"]),
add_files=[AddFile.from_json(json.loads(file)["add"]) for file in lines],
add_files=[AddFile.from_json(json.loads(file)["file"]) for file in lines],
)

def close(self):
Expand Down
3 changes: 0 additions & 3 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,3 @@ def test_load(profile_path: str, fragments: str, table: Table, expected: pd.Data

pdf = reader.to_pandas()
pd.testing.assert_frame_equal(pdf, expected)

pdf = DeltaSharing(profile_path).load_as_pandas(table)
pd.testing.assert_frame_equal(pdf, expected)
2 changes: 1 addition & 1 deletion python/delta_sharing/tests/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_share_profile(tmp_path):
json = """
{
"endpoint": "https://localhost/delta-sharing/",
"token": "token"
"bearerToken": "token"
}
"""
profile = ShareProfile.from_json(json)
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/protobuf/protocol.proto
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
syntax = "proto2";

package io.delta.exchange.protocol;
package io.delta.sharing.server.protocol;

import "scalapb/scalapb.proto";

option java_package = "io.delta.exchange.protocol";
option java_package = "io.delta.sharing.server.protocol";

option java_generate_equals_and_hash = true;
option (scalapb.options).flat_package = true;

// Define the JSON objects used by REST APIs. The table metadata format is not defined in this file
// because it requires Map type which is not supported by Protocol Buffers Version 2.

message PageToken {
optional string id = 1;
}
Expand Down

This file was deleted.

Loading

0 comments on commit 45c4443

Please sign in to comment.