Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use Deferred #11

Open
Enorio opened this issue Apr 9, 2018 · 4 comments
Open

How to use Deferred #11

Enorio opened this issue Apr 9, 2018 · 4 comments

Comments

@Enorio
Copy link

Enorio commented Apr 9, 2018

I'm having trouble understanding the flow of the Deferred class.
For example, this API is used in the AsyncKudu. How can I do this method?

public void foo(){
    //verify that the table exists;
    ...
    //if exists, open table;
    ...
    //write on the table;
   ...
}

More specifically, the AsyncKuduClient has this method to see if a table exists:

Deferred<Boolean> aux = kuduClient.tableExists(tableName);

What can I do after this?
And after that, how can I call this method asynchronously?

Thank you.

@Enorio
Copy link
Author

Enorio commented Apr 9, 2018

I did it. I can reach the kudu table and write what I want in there. Although I get this error

java.lang.AssertionError: Tried to resume the execution of Deferred@2103498087(state=DONE, result=true, callback=, errback=)) although it's not in state=PAUSED. This occurred after the completion of Deferred@316763603(state=RUNNING, result=Written with success., callback=, errback=) which was originally returned by callback=com.foo.kudu.KuduDAOImpl$$Lambda$34/1065552698@465c6144@1180459332

Any idea what this is?

@manolama
Copy link
Member

So for the first part, you use a Deferred like a Future so add a callback using one of the addCallback() methods and/or addErrorback(). Or call .join() on it.

For that error, it sounds like the deferred was executed twice. Make sure you are not calling aux.callback(some object) anywhere. Only the AsyncKuduClient should do that.

@Enorio
Copy link
Author

Enorio commented Apr 10, 2018

The only way I could reach the kudu database and write there was like this
this is basically callbacks inside callbacks inside callbacks :D

public void asyncWrite(String tableName, Foo foo) {
        Deferred<Deferred<Boolean>> res = new Deferred<>();

        res.addCallback(new Callback<Deferred<Deferred<String>>, Deferred<Boolean>>() {
            @Override
            public Deferred<Deferred<String>> call(Deferred<Boolean> booleanDeferred){
                return booleanDeferred.addCallbacks(new Callback<Deferred<String>, Boolean>() {
                    @Override
                    public Deferred<String> call(Boolean aBoolean) throws Exception {
                        if (aBoolean) {
                            Deferred<String> stringDeferred = kuduClient.openTable(tableName)
                                    .addCallbacks(new Callback<String, KuduTable>() {
                                        @Override
                                        public String call(KuduTable kuduTable) throws KuduException {
                                            AsyncKuduSession session = kuduClient.newSession();
                                            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

                                            Upsert upst = kuduTable.newUpsert();
                                            PartialRow row = upst.getRow();
                                            row.addString("house_id", foo.getId());
                                      
                                            session.apply(upst);
                                            session.close();
                                            LOGGER.info("Written with success.");
                                            return "Written with success.";
                                        }
                                    }, (Callback<Object, KuduException>) e -> {
                                        LOGGER.error(e.getMessage());
                                        return e;
                                    });
                            return stringDeferred;
                        } else {
                            throw new Exception("Table doesn't exists");
                        }

                    }
                }, (Callback<Object, Exception>) e -> {
                    System.out.println(e.getMessage());
                    return e;
                });
            }
        });

        //executing the callback
        res.callback(kuduClient.tableExists(tableName));
    }

@manolama
Copy link
Member

ahh, so if you're using https://kudu.apache.org/apidocs/org/apache/kudu/client/AsyncKuduClient.html then you need to attach your callbacks to those method calls. E.g. res.callback(kuduClient.tableExists(tableName)); should be

kuduClient.tableExists(tableName).
addCallback(new Callback<Object, Boolean>() {
  @Override
  public Object call(Boolean exists) {
     return kuduClient.openTable(tableName).addCallback(
           new Callback<Object, KuduTable>() {
       @Override
       public Object call(KuduTable kuduTable) throws KuduException {
         AsyncKuduSession session = kuduClient.newSession();
         session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
         Upsert upst = kuduTable.newUpsert();
         PartialRow row = upst.getRow();
         row.addString("house_id", foo.getId());
                                      
        session.apply(upst);
        session.close();
        LOGGER.info("Written with success.");
         return null;
       }
     )};
  }
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants