Skip to content

Commit

Permalink
Merge pull request HamaWhiteGG#57 from HamaWhiteGG/dev
Browse files Browse the repository at this point in the history
Support Flink SQL Agent
  • Loading branch information
HamaWhiteGG authored Jul 23, 2023
2 parents 80ca553 + c17b614 commit 47a56c6
Show file tree
Hide file tree
Showing 21 changed files with 892 additions and 22 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following example can view in the [langchain-example](langchain-examples/src
- [SQL Chains](langchain-examples/src/main/java/com/hw/langchain/examples/chains/SqlChainExample.java)
- [API Chains](langchain-examples/src/main/java/com/hw/langchain/examples/chains/ApiChainExample.java)
- [Spark SQL Agent](langchain-bigdata/langchain-spark/src/test/java/com/hw/langchain/agents/toolkits/spark/sql/toolkit/SparkSqlToolkitTest.java)
- [Flink SQL Agent](langchain-bigdata/langchain-flink/src/test/java/com/hw/langchain/agents/toolkits/flink/sql/toolkit/FlinkSqlToolkitTest.java)
- [Agent with Google Search](langchain-examples/src/main/java/com/hw/langchain/examples/agents/LlmAgentExample.java)
- [Question answering over documents](langchain-examples/src/main/java/com/hw/langchain/examples/chains/RetrievalQaExample.java)
- [Context aware text splitting and QA](langchain-examples/src/main/java/com/hw/langchain/examples/chains/RetrievalMarkdownExample.java)
Expand Down
10 changes: 10 additions & 0 deletions docs/extras/modules/titanic_flink.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C
11,1,3,"Sandstrom, Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S
55 changes: 55 additions & 0 deletions langchain-bigdata/langchain-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,67 @@

<artifactId>langchain-flink</artifactId>

<properties>
<flink.version>1.17.1</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>io.github.hamawhitegg</groupId>
<artifactId>langchain-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hw.langchain.agents.toolkits.flink.sql.base;

import com.hw.langchain.agents.agent.Agent;
import com.hw.langchain.agents.agent.AgentExecutor;
import com.hw.langchain.agents.mrkl.base.ZeroShotAgent;
import com.hw.langchain.agents.toolkits.flink.sql.toolkit.FlinkSqlToolkit;
import com.hw.langchain.base.language.BaseLanguageModel;
import com.hw.langchain.chains.llm.LLMChain;
import com.hw.langchain.prompts.prompt.PromptTemplate;
import com.hw.langchain.tools.base.BaseTool;

import java.util.List;
import java.util.Map;

import static com.hw.langchain.agents.mrkl.prompt.Prompt.FORMAT_INSTRUCTIONS;
import static com.hw.langchain.agents.toolkits.flink.sql.prompt.Prompt.SQL_PREFIX;
import static com.hw.langchain.agents.toolkits.flink.sql.prompt.Prompt.SQL_SUFFIX;
import static com.hw.langchain.prompts.utils.FormatUtils.formatTemplate;

/**
* @author HamaWhite
*/
public class FlinkSqlAgent {

private FlinkSqlAgent() {
// private constructor to hide the implicit public one
throw new IllegalStateException("Utility class");
}

/**
* Construct a Flink SQL agent from an LLM and tools.
*/
public static AgentExecutor createFlinkSqlAgent(BaseLanguageModel llm, FlinkSqlToolkit toolkit) {
return createFlinkSqlAgent(llm, toolkit, SQL_PREFIX, SQL_SUFFIX, FORMAT_INSTRUCTIONS, null, 10, 15, null,
"force");
}

/**
* Construct a Flink SQL agent from an LLM and tools.
*/
@SuppressWarnings("all")
public static AgentExecutor createFlinkSqlAgent(
BaseLanguageModel llm,
FlinkSqlToolkit toolkit,
String prefix,
String suffix,
String formatInstructions,
List<String> inputVariables,
int topK,
Integer maxIterations,
Float maxExecutionTime,
String earlyStoppingMethod) {
List<BaseTool> tools = toolkit.getTools();
prefix = formatTemplate(prefix, Map.of("top_k", topK));

PromptTemplate prompt = ZeroShotAgent.createPrompt(tools, prefix, suffix, formatInstructions, inputVariables);
LLMChain llmChain = new LLMChain(llm, prompt);

List<String> toolNames = tools.stream().map(BaseTool::getName).toList();
Agent agent = new ZeroShotAgent(llmChain, toolNames);

return AgentExecutor.builder()
.agent(agent)
.tools(tools)
.maxIterations(maxIterations)
.maxExecutionTime(maxExecutionTime)
.earlyStoppingMethod(earlyStoppingMethod)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hw.langchain.agents.toolkits.flink.sql.prompt;

/**
* @author HamaWhite
*/
public class Prompt {

private Prompt() {
// private constructor to hide the implicit public one
throw new IllegalStateException("Utility class");
}

public static final String SQL_PREFIX =
"""
You are an agent designed to interact with Flink SQL.
Given an input question, create a syntactically correct Flink SQL query to run, then look at the results of the query and return the answer.
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.
You have access to tools for interacting with the database.
Only use the below tools. Only use the information returned by the below tools to construct your final answer.
You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.
DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.
If the question does not seem related to the database, just return "I don't know" as the answer.
""";

public static final String SQL_SUFFIX = """
Begin!
Question: {input}
Thought: I should look at the tables in the database to see what I can query.
{agent_scratchpad}""";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hw.langchain.agents.toolkits.flink.sql.toolkit;

import com.hw.langchain.agents.toolkits.base.BaseToolkit;
import com.hw.langchain.base.language.BaseLanguageModel;
import com.hw.langchain.tools.base.BaseTool;
import com.hw.langchain.tools.flink.sql.tool.InfoFlinkSqlTool;
import com.hw.langchain.tools.flink.sql.tool.ListFlinkSqlTool;
import com.hw.langchain.tools.flink.sql.tool.QueryCheckerTool;
import com.hw.langchain.tools.flink.sql.tool.QueryFlinkSqlTool;
import com.hw.langchain.utilities.flink.sql.FlinkSql;

import java.util.List;

/**
* Toolkit for interacting with Flink SQL.
*
* @author HamaWhite
*/
public class FlinkSqlToolkit implements BaseToolkit {

private final FlinkSql db;

private final BaseLanguageModel llm;

public FlinkSqlToolkit(FlinkSql db, BaseLanguageModel llm) {
this.db = db;
this.llm = llm;
}

@Override
public List<BaseTool> getTools() {
return List.of(
new QueryFlinkSqlTool(db),
new InfoFlinkSqlTool(db),
new ListFlinkSqlTool(db),
new QueryCheckerTool(db, llm));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hw.langchain.tools.flink.sql.prompt;

/**
* @author HamaWhite
*/
public class Prompt {

private Prompt() {
// private constructor to hide the implicit public one
throw new IllegalStateException("Utility class");
}

public static final String QUERY_CHECKER =
"""
{query}
Double check the Flink SQL query above for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins
If there are any of the above mistakes, rewrite the query. If there are no mistakes, just reproduce the original query.""";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hw.langchain.tools.flink.sql.tool;

import com.hw.langchain.tools.base.BaseTool;
import com.hw.langchain.utilities.flink.sql.FlinkSql;

import lombok.EqualsAndHashCode;

/**
* Base tool for interacting with Flink SQL.
*
* @author HamaWhite
*/
@EqualsAndHashCode(callSuper = true)
public abstract class BaseFlinkSqlTool extends BaseTool {

protected final FlinkSql db;

protected BaseFlinkSqlTool(FlinkSql db, String name, String description) {
super(name, description);
this.db = db;
}
}
Loading

0 comments on commit 47a56c6

Please sign in to comment.