Skip to content

Commit

Permalink
Add apache cassandra certification tests (dapr#1985)
Browse files Browse the repository at this point in the history
* Add apache cassandra certification tests

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* go mod tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* add cassandra to pr

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* update mod

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* wait for sidecar

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* shorten time

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* uncomment

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* fix docker reset

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* remove caps

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* update copyright date

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* go mod tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* go-sdk

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* add component changes

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* update tests

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* typo

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* updates

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* update readme

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* go mod tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* go mod tidy

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

* uncomment network

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>

Signed-off-by: Addison Juarez <adjuarez@microsoft.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 14, 2022
1 parent d4f2b10 commit 8184814
Show file tree
Hide file tree
Showing 14 changed files with 1,895 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/certification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
- state.mongodb
- state.redis
- state.postgresql
- state.cassandra
- bindings.alicloud.dubbo
- bindings.kafka
- secretstores.local.env
Expand Down
65 changes: 65 additions & 0 deletions tests/certification/state/cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

## Test for TTL
1. TTL not expiring
2. TTL not a valid number
3. TTL Expires as expected
- Provide a TTL of 5 second
- Fetch this record just after saving
- Sleep for 5 seconds
- Try to fetch again after a gap of 5 seconds, record shouldn't be deleted

## Connection Recovery

1. When Cassandra goes down and then comes back up the client is able to reconnect

## Test Metadata Fields
1. Verify `port` attribute is used
- set port to non default value
- run dapr application with component
- component should successfully initialize

2. Verify `keyspace` attribute is used
- set keyspace to non-default value
- run dapr application with component
- component should successfully initialize and create keyspace

3. Verify `table` attribute is used
- set table to non-default value
- run dapr application with component
- component should successfully initialize and create table
- successfully run query on table

4. Verify `protoVersion` attribute is used
- set protoVersion to non-default value 0
- run dapr application with component
- cassandra client itself should detect version from cluster if protoVersion == 0
- component should successfully initialize
- run queries to verify

5. Verify `protoVersion` attribute is used -negative test
- set protoVersion to non-default value 1
- run dapr application with component
- component should recieve errors on queries

6. Verify `replicationFactor` attribute is used
- set replicationFactor to non-default value 2
- run dapr application with component using 2 nodes
- component should successfully initialize
- run queries to verify

7. Verify `replicationFactor` attribute is used - negative test
- set replicationFactor to non-default value 2
- run dapr application with component using 1 node
- component should recieve errors on queries

8. Verify `consistency` attribute is used - negative test
- set consistency to non-default value "Three"
- run dapr application with component
- component should successfully initialize
- run queries and see failure due to less than 3 nodes available

9. Verify `consistency` attribute is used
- set consistency to non-default value "Two"
- run dapr application with component
- component should successfully initialize
- run queries successfully
312 changes: 312 additions & 0 deletions tests/certification/state/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cassandra_test

import (
"fmt"
"github.com/dapr/components-contrib/state"
state_cassandra "github.com/dapr/components-contrib/state/cassandra"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
state_loader "github.com/dapr/dapr/pkg/components/state"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
goclient "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

const (
sidecarNamePrefix = "cassandra-sidecar-"
dockerComposeYAMLCLUSTER = "docker-compose-cluster.yml"
dockerComposeYAML = "docker-compose-single.yml"

stateStoreName = "statestore"
stateStoreCluster = "statestorecluster"
stateStoreClusterFail = "statestoreclusterfail"
stateStoreVersionFail = "statestoreversionfail"
stateStoreFactorFail = "statestorefactorfail"

certificationTestPrefix = "stable-certification-"
stateStoreNoConfigError = "error saving state: rpc error: code = FailedPrecondition desc = state store is not configured"
)

func TestCassandra(t *testing.T) {
log := logger.NewLogger("dapr.components")
stateStore := state_cassandra.NewCassandraStateStore(log).(*state_cassandra.Cassandra)
ports, err := dapr_testing.GetFreePorts(2)
assert.NoError(t, err)

stateRegistry := state_loader.NewRegistry()
stateRegistry.Logger = log
stateRegistry.RegisterComponent(func(l logger.Logger) state.Store {
return stateStore
}, "cassandra")

currentGrpcPort := ports[0]
currentHTTPPort := ports[1]

basicTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()

err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)

// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert", string(item.Value))

errUpdate := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("cassandraCertUpdate"), nil)
assert.NoError(t, errUpdate)
item, errUpdatedGet := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, errUpdatedGet)
assert.Equal(t, "cassandraCertUpdate", string(item.Value))

// delete state
err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)

return nil
}

// Time-To-Live Test
timeToLiveTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()

ttlInSecondsWrongValue := "mock value"
mapOptionsWrongValue :=
map[string]string{
"ttlInSeconds": ttlInSecondsWrongValue,
}

ttlInSecondsNonExpiring := 0
mapOptionsNonExpiring :=
map[string]string{
"ttlInSeconds": strconv.Itoa(ttlInSecondsNonExpiring),
}

ttlInSeconds := 5
mapOptions :=
map[string]string{
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
}

err1 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("cassandraCert"), mapOptionsWrongValue)
assert.Error(t, err1)
err2 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("cassandraCert2"), mapOptionsNonExpiring)
assert.NoError(t, err2)
err3 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("cassandraCert3"), mapOptions)
assert.NoError(t, err3)

// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert3", string(item.Value))
time.Sleep(5 * time.Second)
//entry should be expired now
itemAgain, errAgain := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
assert.NoError(t, errAgain)
assert.Nil(t, nil, itemAgain)

return nil
}

testGetAfterCassandraRestart := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()

// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl2", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert2", string(item.Value))

return nil
}

failTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 2))
if err != nil {
panic(err)
}
defer client.Close()

//should fail due to lack of replicas
err = client.SaveState(ctx, stateStoreFactorFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.Error(t, err)

return nil
}

failVerTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 4))
if err != nil {
panic(err)
}
defer client.Close()
// should fail due to unsupported version
err = client.SaveState(ctx, stateStoreVersionFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.Error(t, err)

return nil
}

flow.New(t, "Connecting cassandra And Ports and Verifying TTL and network tests and table creation").
Step(dockercompose.Run("cassandra", dockerComposeYAML)).
Step("wait", flow.Sleep(80*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort),
embedded.WithDaprHTTPPort(currentHTTPPort),
embedded.WithComponentsPath("components/docker/default"),
runtime.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run TTL related test", timeToLiveTest).
Step("interrupt network",
network.InterruptNetwork(10*time.Second, nil, nil, "9044:9042")).
//Component should recover at this point.
Step("wait", flow.Sleep(30*time.Second)).
Step("Run basic test again to verify reconnection occurred", basicTest).
Step("stop cassandra server", dockercompose.Stop("cassandra", dockerComposeYAML, "cassandra")).
Step("start cassandra server", dockercompose.Start("cassandra", dockerComposeYAML, "cassandra")).
Step("wait", flow.Sleep(60*time.Second)).
Step("Get Values Saved Earlier And Not Expired, after Cassandra restart", testGetAfterCassandraRestart).
Step("Run basic test", basicTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault2",
embedded.WithoutApp(),
embedded.WithProfilePort(runtime.DefaultProfilePort+2),
embedded.WithDaprGRPCPort(currentGrpcPort+2),
embedded.WithDaprHTTPPort(currentHTTPPort+2),
embedded.WithComponentsPath("components/docker/defaultfactorfail"),
runtime.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run replication factor fail test", failTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault3",
embedded.WithoutApp(),
embedded.WithProfilePort(runtime.DefaultProfilePort+4),
embedded.WithDaprGRPCPort(currentGrpcPort+4),
embedded.WithDaprHTTPPort(currentHTTPPort+4),
embedded.WithComponentsPath("components/docker/defaultverisonfail"),
runtime.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run replication factor fail test", failVerTest).
Run()

}

func TestCluster(t *testing.T) {
log := logger.NewLogger("dapr.components")
stateStore := state_cassandra.NewCassandraStateStore(log).(*state_cassandra.Cassandra)
ports, err := dapr_testing.GetFreePorts(2)
assert.NoError(t, err)

currentGrpcPort := ports[0]
currentHTTPPort := ports[1]

stateRegistry := state_loader.NewRegistry()
stateRegistry.Logger = log
stateRegistry.RegisterComponent(func(l logger.Logger) state.Store {
return stateStore
}, "cassandra")

basicTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()

err = client.SaveState(ctx, stateStoreCluster, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)

// get state
item, err := client.GetState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert", string(item.Value))

errUpdate := client.SaveState(ctx, stateStoreCluster, certificationTestPrefix+"key1", []byte("cassandraCertUpdate"), nil)
assert.NoError(t, errUpdate)
item, errUpdatedGet := client.GetState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, errUpdatedGet)
assert.Equal(t, "cassandraCertUpdate", string(item.Value))

// delete state
err = client.DeleteState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)

return nil
}

failTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 2))
if err != nil {
panic(err)
}
defer client.Close()

err = client.SaveState(ctx, stateStoreClusterFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)

// get state
_, err = client.GetStateWithConsistency(ctx, stateStoreClusterFail, certificationTestPrefix+"key1", nil, goclient.StateConsistencyUndefined)
assert.Error(t, err)

return nil
}

flow.New(t, "Connecting cassandra And Verifying port/tables/keyspaces/consistency").
Step(dockercompose.Run("cassandra", dockerComposeYAMLCLUSTER)).
Step("wait", flow.Sleep(80*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort),
embedded.WithDaprHTTPPort(currentHTTPPort),
embedded.WithComponentsPath("components/docker/cluster"),
runtime.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run basic test", basicTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault2",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort+2),
embedded.WithDaprHTTPPort(currentHTTPPort+2),
embedded.WithComponentsPath("components/docker/cluster-fail"),
embedded.WithProfilePort(runtime.DefaultProfilePort+2),
runtime.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run consistency fail test", failTest).
Run()

}
Loading

0 comments on commit 8184814

Please sign in to comment.