diff --git a/src/dderlodpi.erl b/src/dderlodpi.erl index 6a7dee38..79c1be3e 100644 --- a/src/dderlodpi.erl +++ b/src/dderlodpi.erl @@ -5,20 +5,35 @@ %% API -export([ - exec/3, % ✓ - exec/4, % ✓ - change_password/4, % ✓ + exec/3, % TODO: Cover all this functions with test cases. + exec/4, % + change_password/4, % add_fsm/2, % fetch_recs_async/3, % fetch_close/1, % filter_and_sort/6, % close/1, % close_port/1, % - run_table_cmd/3, % ✓ + run_table_cmd/3, % cols_to_rec/2, % get_alias/1, % fix_row_format/4, % - create_rowfun/3 % + create_rowfun/3 +]). + +%% helper functions for odpi_stmt +-export([ + dpi_conn_prepareStmt/2, + dpi_conn_commit/1, + dpi_conn_rollback/1, + dpi_conn_newVar/2, + dpi_stmt_bindByName/4, + dpi_stmt_execute/3, + dpi_stmt_executeMany/4, + dpi_var_set_many/3, + dpi_stmt_close/2, + dpi_var_release/2, + dpi_data_release/2 ]). %% gen_server callbacks @@ -723,7 +738,7 @@ translate_datatype(Stmt, [Number | RestRow], [#stmtCol{type = Type} | RestCols]) Type =:= 'DPI_ORACLE_TYPE_NUMBER'; Type =:= 'DPI_ORACLE_TYPE_NATIVE_DOUBLE'; Type =:= 'DPI_ORACLE_TYPE_NATIVE_FLOAT' -> - Result = dderloci_utils:clean_dynamic_prec(float_to_binary(Number, [{decimals,20}, compact])), + Result = dderloci_utils:clean_dynamic_prec(number_to_binary(Number)), [Result | translate_datatype(Stmt, RestRow, RestCols)]; translate_datatype(Stmt, [{_Pointer, Size, Path, Name} | RestRow], [#stmtCol{type = 'SQLT_BFILEE'} | RestCols]) -> SizeBin = integer_to_binary(Size), @@ -798,19 +813,24 @@ fix_format(Stmt, [Cell | RestRow], [#stmtCol{} | RestCols]) -> [Cell | fix_format(Stmt, RestRow, RestCols)]. -spec run_table_cmd(tuple(), atom(), binary()) -> ok | {error, term()}. %% %% !! Fix this to properly use statements. -run_table_cmd({oci_port, _, _} = _Connection, restore_table, _TableName) -> {error, <<"Command not implemented">>}; -run_table_cmd({oci_port, _, _} = _Connection, snapshot_table, _TableName) -> {error, <<"Command not implemented">>}; -run_table_cmd({oci_port, _, _} = Connection, truncate_table, TableName) -> +run_table_cmd(#odpi_conn{}, restore_table, _TableName) -> {error, <<"Command not implemented">>}; +run_table_cmd(#odpi_conn{}, snapshot_table, _TableName) -> {error, <<"Command not implemented">>}; +run_table_cmd(#odpi_conn{} = Connection, truncate_table, TableName) -> run_table_cmd(Connection, iolist_to_binary([<<"truncate table ">>, TableName])); -run_table_cmd({oci_port, _, _} = Connection, drop_table, TableName) -> +run_table_cmd(#odpi_conn{} = Connection, drop_table, TableName) -> run_table_cmd(Connection, iolist_to_binary([<<"drop table ">>, TableName])). -spec run_table_cmd(reference(), binary()) -> ok | {error, term()}. run_table_cmd(Connection, SqlCmd) -> - Stmt = dpi:conn_prepareStmt(Connection, false, SqlCmd, <<"">>), - dpi:stmt_execute(Stmt, []), - dpi:stmt_release(Stmt), - ok. + Stmt = dpi_conn_prepareStmt(Connection, SqlCmd), + Result = case dpi_stmt_execute(Connection, Stmt) of + 0 -> ok; % 0 rows available is the success response. + Error -> + ?Error("Error running table command ~p, result ~p", [SqlCmd, Error]), + {error, <<"Table command failed">>} + end, + dpi_stmt_close(Connection, Stmt), + Result. -spec find_original_field(binary(), list()) -> {binary(), boolean(), list()}. find_original_field(Alias, []) -> {Alias, false, []}; @@ -852,7 +872,6 @@ parse_sql({ok, [{{'begin procedure', _},_}]}, Sql) -> parse_sql(_UnsuportedSql, Sql) -> {Sql, Sql, <<"">>, false, []}. - %%%% Dpi data helper functions dpi_to_dderltime(#{day := Day, month := Month, year := Year, hour := Hour, minute := Min, second := Sec}) -> @@ -897,24 +916,58 @@ pad(IntValue) -> Value = integer_to_list(IntValue), pad(Value, 2). +number_to_binary(Int) when is_integer(Int) -> integer_to_binary(Int); +number_to_binary(Float) -> float_to_binary(Float, [{decimals,20}, compact]). + %%%% Dpi safe functions executed on dpi slave node dpi_conn_prepareStmt(#odpi_conn{node = Node, connection = Conn}, Sql) -> dpi:safe(Node, fun() -> dpi:conn_prepareStmt(Conn, false, Sql, <<"">>) end). -dpi_stmt_execute(#odpi_conn{node = Node, connection = Conn}, Stmt) -> +dpi_conn_commit(#odpi_conn{node = Node, connection = Conn}) -> + dpi:safe(Node, fun() -> dpi:conn_commit(Conn) end). + +dpi_conn_rollback(#odpi_conn{node = Node, connection = Conn}) -> + dpi:safe(Node, fun() -> dpi:conn_rollback(Conn) end). + +%% TODO: Probably oracle type should be given... +dpi_conn_newVar(#odpi_conn{node = Node, connection = Conn}, Count) -> dpi:safe(Node, fun() -> - Result = dpi:stmt_execute(Stmt, []), - ok = dpi:conn_commit(Conn), % Commit automatically for any dderl query. - Result + #{var := Var, data := DataList} = + dpi:conn_newVar( + Conn, 'DPI_ORACLE_TYPE_VARCHAR', 'DPI_NATIVE_TYPE_BYTES', Count, 4000, + false, false, null + ), + {Var, DataList} end). +dpi_stmt_bindByName(#odpi_conn{node = Node}, Stmt, Name, Var) -> + dpi:safe(Node, fun() -> dpi:stmt_bindByName(Stmt, Name, Var) end). + +dpi_stmt_execute(Connection, Stmt) -> + % Commit automatically for any dderl queries. + dpi_stmt_execute(Connection, Stmt, ['DPI_MODE_EXEC_COMMIT_ON_SUCCESS']). + +dpi_stmt_execute(#odpi_conn{node = Node}, Stmt, Mode) -> + dpi:safe(Node, fun() -> dpi:stmt_execute(Stmt, Mode) end). + +dpi_stmt_executeMany(#odpi_conn{node = Node}, Stmt, Count, Mode) -> + dpi:safe(Node, fun() -> dpi:stmt_executeMany(Stmt, Mode, Count) end). + dpi_stmt_getInfo(#odpi_conn{node = Node}, Stmt) -> dpi:safe(Node, fun() -> dpi:stmt_getInfo(Stmt) end). dpi_stmt_close(#odpi_conn{node = Node}, Stmt) -> dpi:safe(Node, fun() -> dpi:stmt_close(Stmt) end). +dpi_var_release(#odpi_conn{node = Node}, Var) -> + dpi:safe(Node, fun() -> dpi:var_release(Var) end). + +dpi_data_release(#odpi_conn{node = Node}, DataList) when is_list(DataList) -> + dpi:safe(Node, fun() -> [dpi:data_release(Data) || Data <- DataList] end); +dpi_data_release(#odpi_conn{node = Node}, Data) -> + dpi:safe(Node, fun() -> dpi:data_release(Data) end). + % This is not directly dpi but seems this is the best place to declare as it is rpc... dpi_query_columns(#odpi_conn{node = Node}, Stmt, NColumns) -> dpi:safe(Node, fun() -> get_column_info(Stmt, 1, NColumns) end). @@ -945,3 +998,19 @@ get_column_values(Stmt, ColIdx, Limit) -> Value = dpi:data_get(Data), dpi:data_release(Data), [Value | get_column_values(Stmt, ColIdx + 1, Limit)]. + +% Helper function to avoid many rpc calls when binding a list of variables. +dpi_var_set_many(#odpi_conn{node = Node}, Vars, Rows) -> + dpi:safe(Node, fun() -> var_bind_many(Vars, Rows, 0) end). + +var_bind_many(_Vars, [], _) -> ok; +var_bind_many(Vars, [Row | Rest], Idx) -> + ok = var_bind_row(Vars, Row, Idx), + var_bind_many(Vars, Rest, Idx + 1). + +var_bind_row([], [], _Idx) -> ok; +var_bind_row([], _Row, _Idx) -> {error, <<"Bind variables does not match the given data">>}; +var_bind_row(_Vars, [], _Idx) -> {error, <<"Bind variables does not match the given data">>}; +var_bind_row([Var | RestVars], [Bytes | RestRow], Idx) -> + ok = dpi:var_setFromBytes(Var, Idx, Bytes), + var_bind_row(RestVars, RestRow, Idx). diff --git a/src/dderlodpi_stmt.erl b/src/dderlodpi_stmt.erl new file mode 100644 index 00000000..1ef4e683 --- /dev/null +++ b/src/dderlodpi_stmt.erl @@ -0,0 +1,509 @@ +-module(dderlodpi_stmt). +-behaviour(gen_server). + +-include("dderlodpi.hrl"). + +-export([prepare/4, + execute/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(NoCommit, 0). +-define(AutoCommit, 1). + +-record(stmt, {columns = [], + del_rows = [], + upd_rows = [], + ins_rows = [], + del_stmt, + upd_stmt, + ins_stmt, + connection}). + +-record(row, {index, id, pos, op, values}). + +-record(binds, {stmt, var, data}). + +%%% API Implementation + +-spec prepare(binary(), list(), tuple(), list()) -> {error, term()} | {ok, pid()}. +prepare(TableName, ChangeList, Connection, Columns) -> + gen_server:start(?MODULE, [TableName, ChangeList, Connection, Columns], [{timeout, ?InitTimeout}]). + +-spec execute(pid()) -> {error, term()} | list(). +execute(Pid) -> + gen_server:call(Pid, execute, ?ExecTimeout). + +%%% gen_server callbacks +init([TableName, ChangeList, Connection, Columns]) -> + case create_stmts(TableName, Connection, ChangeList, Columns) of + {ok, Stmt} -> {ok, Stmt}; + {error, Error} -> {stop, Error} + end. + +handle_call(execute, _From, #stmt{columns = Columns, connection = Connection} = Stmt) -> + try + case process_delete(Connection, Stmt#stmt.del_stmt, Stmt#stmt.del_rows, Columns) of + {ok, DeleteChangeList} -> + case process_update(Stmt#stmt.upd_stmt, Stmt#stmt.upd_rows, Columns) of + {ok, UpdateChangeList} -> + case process_insert(Stmt#stmt.ins_stmt, Stmt#stmt.ins_rows, Columns) of + {ok, InsertChangeList} -> + dderlodpi:dpi_conn_commit(Connection), + {stop, normal, DeleteChangeList ++ UpdateChangeList ++ InsertChangeList, Stmt}; + Error -> + dderlodpi:dpi_conn_rollback(Connection), + {stop, normal, Error, Stmt} + end; + Error -> + dderlodpi:dpi_conn_rollback(Connection), + {stop, normal, Error, Stmt} + end; + Error -> + dderlodpi:dpi_conn_rollback(Connection), + {stop, normal, Error, Stmt} + end + catch _Class:Error2 -> + dderlodpi:dpi_conn_rollback(Connection), + {stop, normal, {error, Error2}, Stmt} + end. + +handle_cast(_Ignored, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #stmt{del_stmt=DelStmt, upd_stmt=UpdStmt, ins_stmt=InsStmt, connection=Conn}) -> + %% Delete is not a list since it is always only one. + close_stmts(Conn, [Stmt || Stmt <- lists:flatten([DelStmt,UpdStmt,InsStmt]), Stmt =/= undefined]). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%% Private functions + +-spec table_name(binary() | {as, binary(), binary()}) -> binary(). +table_name(TableName) when is_binary(TableName) -> TableName; +table_name({as, TableName, Alias}) -> iolist_to_binary([TableName, " ", Alias]). + +-spec alias_name(binary() | {as, binary(), binary()}) -> binary(). +alias_name(TableName) when is_binary(TableName) -> TableName; +alias_name({as, _TableName, Alias}) -> Alias. + +-spec create_stmts(binary(), tuple(), list(), list()) -> {ok, #stmt{}} | {error, term()}. +create_stmts(TableName, Connection, ChangeList, Columns) -> + {DeleteList, UpdateListTotal, InsertListTotal} = split_changes(ChangeList), + UpdateList = split_by_columns_mod(UpdateListTotal, Columns, []), + InsertList = split_by_non_empty(InsertListTotal, []), + case create_stmts([{del, DeleteList}, {upd, UpdateList}, {ins, InsertList}], TableName, Connection, Columns, []) of + {ok, Stmt} -> + {ok, Stmt#stmt{del_rows = DeleteList, upd_rows = UpdateList, ins_rows = InsertList, connection = Connection}}; + Error -> + Error + end. + +-spec create_stmts([{atom(), list()}], binary(), tuple(), list(), list()) -> {ok, #stmt{}} | {error, term()}. +create_stmts([], _TableName, _Connection, _Columns, []) -> + {error, <<"empty change list">>}; +create_stmts([], _TableName, _Connection, Columns, ResultStmts) -> + DelStmt = proplists:get_value(del, ResultStmts), + UpdStmt = proplists:get_value(upd, ResultStmts), + InsStmt = proplists:get_value(ins, ResultStmts), + {ok, #stmt{columns = Columns, del_stmt = DelStmt, upd_stmt = UpdStmt, ins_stmt = InsStmt}}; +create_stmts([{del, []} | Rest], TableName, Connection, Columns, ResultStmt) -> + create_stmts(Rest, TableName, Connection, Columns, ResultStmt); +create_stmts([{del, DelList} | Rest], TableName, Connection, Columns, ResultStmt) -> + Sql = iolist_to_binary([<<"delete from ">>, table_name(TableName), " where ", alias_name(TableName), ".ROWID = :IDENTIFIER"]), + case dderlodpi:dpi_conn_prepareStmt(Connection, Sql) of + Stmt when is_reference(Stmt) -> + {Var, DataList} = dderlodpi:dpi_conn_newVar(Connection, length(DelList)), + ok = dderlodpi:dpi_stmt_bindByName(Connection, Stmt, <<"IDENTIFIER">>, Var), + StmtBind = #binds{stmt = Stmt, var = Var, data = DataList}, + create_stmts(Rest, TableName, Connection, Columns, [{del, StmtBind} | ResultStmt]); + Error -> + ?Error("Error preparing delete stmt: ~p", [Error]), + close_stmts(Connection, ResultStmt), + {error, iolist_to_binary(["Unable to prepare stmt: ", Sql])} + end; +create_stmts([{upd, []} | Rest], TableName, Connection, Columns, ResultStmt) -> + create_stmts(Rest, TableName, Connection, Columns, ResultStmt); +create_stmts([{upd, UpdList} | Rest], TableName, Connection, Columns, ResultStmts) -> + [{ModifiedCols, _Rows} | RestUpdList] = UpdList, + FilterColumns = filter_columns(ModifiedCols, Columns), + UpdVars = create_upd_vars(FilterColumns), + % TODO: use where part to do optimistic locking. + % WhereVars = create_where_vars(Columns), + Sql = iolist_to_binary([<<"update ">>, table_name(TableName), " set ", UpdVars, " where ", alias_name(TableName), ".ROWID = :IDENTIFIER"]), + case dderlodpi:dpi_conn_prepareStmt(Connection, Sql) of + Stmt when is_reference(Stmt) -> %% TODO: Implement from here down... + BindTypes = [{<<":IDENTIFIER">>, 'SQLT_STR'} | create_bind_types(FilterColumns)], + Stmt:bind_vars(BindTypes), + case proplists:get_value(upd, ResultStmts) of + undefined -> + NewResultStmts = [{upd, [Stmt]} | ResultStmts]; + UpdtStmts -> + NewResultStmts = lists:keyreplace(upd, 1, ResultStmts, {upd, UpdtStmts ++ [Stmt]}) + end, + create_stmts([{upd, RestUpdList} | Rest], TableName, Connection, Columns, NewResultStmts); + Error -> + ?Error("Error preparing update stmt: ~p", [Error]), + close_stmts(Connection, ResultStmts), + {error, iolist_to_binary(["Unable to prepare stmt: ", Sql])} + end; +create_stmts([{ins, []} | Rest], TableName, Connection, Columns, ResultStmt) -> + create_stmts(Rest, TableName, Connection, Columns, ResultStmt); +%[{{1,2,3},[{row,{{}},undefined,2,ins,[<<"3.33">>,<<"22">>,<<"23">>]}]}, +%{{1,3}, +% [{row,{{}},undefined,3,ins,[<<"4.0">>,<<>>,<<"231">>]}, +% {row,{{}},undefined,4,ins,[<<"87.4">>,<<>>,<<"44">>]}]}, +%{{2,3}, +% [{row,{{}},undefined,5,ins,[<<>>,<<"12">>,<<"55">>]}, +% {row,{{}},undefined,6,ins,[<<>>,<<"33">>,<<"22">>]}]}] +create_stmts([{ins, InsList} | Rest], TableName, Connection, Columns, ResultStmts) -> + [{NonEmptyCols, _Rows} | RestInsList] = InsList, + InsColumns = ["(", create_ins_columns(filter_columns(NonEmptyCols, Columns)), ")"], + Sql = iolist_to_binary([<<"insert into ">>, table_name(TableName), " ", InsColumns, " values ", "(", create_ins_vars(filter_columns(NonEmptyCols, Columns)), ")"]), + case Connection:prep_sql(Sql) of + {error, {_ErrorCode, ErrorMsg}} -> + %%TODO: ?Error... + close_stmts(Connection, ResultStmts), + {error, ErrorMsg}; + Stmt -> + BindTypes = create_bind_types(filter_columns(NonEmptyCols, Columns)), + Stmt:bind_vars(BindTypes), + case proplists:get_value(ins, ResultStmts) of + undefined -> + NewResultStmts = [{ins, [Stmt]} | ResultStmts]; + InsStmts -> + NewResultStmts = lists:keyreplace(ins, 1, ResultStmts, {ins, InsStmts ++ [Stmt]}) + end, + create_stmts([{ins, RestInsList} | Rest], TableName, Connection, Columns, NewResultStmts) + end. + +-spec process_delete(term(), term(), list(), list()) -> {ok, list()} | {error, term()}. +process_delete(_Conn, undefined, [], _Columns) -> {ok, []}; +process_delete(Connection, #binds{stmt = Stmt, var = Var}, Rows, _Columns) -> + RowsToDelete = [[Row#row.id] || Row <- Rows], + %% Delete is always only one column (Rowid), so wrap the var in a list. + ok = dderlodpi:dpi_var_set_many(Connection, [Var], RowsToDelete), + case dderlodpi:dpi_stmt_executeMany(Connection, Stmt, length(RowsToDelete), []) of + {error, _DpiNifFile, _Line, #{message := Msg}} -> + {error, list_to_binary(Msg)}; + ok -> + ChangedKeys = [{Row#row.pos, {{}, {}}} || Row <- Rows], + {ok, ChangedKeys} + end. + +-spec process_update(list(), list(), [#stmtCol{}]) -> {ok, list()} | {error, term()}. +process_update(undefined, [], _Columns) -> {ok, []}; +process_update([], [], _Colums) -> {ok, []}; +process_update([PrepStmt | RestStmts], [{ModifiedCols, Rows} | RestRows], Columns) -> + %% TODO: No need to filter the rows for optimistic locking... + FilterRows = [Row#row{values = filter_columns(ModifiedCols, Row#row.values)} || Row <- Rows], + case process_one_update(PrepStmt, FilterRows, filter_columns(ModifiedCols, Columns), Rows, Columns) of + {ok, ChangedKeys} -> + case process_update(RestStmts, RestRows, Columns) of + {ok, RestChangedKeys} -> + {ok, ChangedKeys ++ RestChangedKeys}; + ErrorRest -> + ErrorRest + end; + Error -> + Error + end. + +-spec process_one_update(term(), [#row{}], [#stmtCol{}], [#row{}], [#stmtCol{}]) -> {ok, list()} | {error, term()}. +process_one_update(PrepStmt, FilterRows, FilterColumns, Rows, Columns) -> + %% TODO: Implement updates using the old values on the where clause, (optimistic locking). + RowsToUpdate = [list_to_tuple(create_bind_vals([Row#row.id | Row#row.values], [#stmtCol{type = 'SQLT_STR'} | FilterColumns])) || Row <- FilterRows], + case PrepStmt:exec_stmt(RowsToUpdate, ?NoCommit) of + {rowids, RowIds} -> + case check_rowid(RowIds, Rows) of + true-> + ChangedKeys = [{Row#row.pos, {{}, list_to_tuple(create_changedkey_vals(Row#row.values ++ [Row#row.id], Columns ++ [#stmtCol{type = 'SQLT_STR'}]))}} || Row <- Rows], + {ok, ChangedKeys}; + false -> + {error, <<"Unknown error updating the rows.">>}; + long_rowid -> + {error, <<"Updating tables with universal rowids is not supported yet.">>} + end; + {error, {_ErrorCode, ErrorMsg}}-> + {error, ErrorMsg} + end. + +-spec process_insert(term(), list(), list()) -> {ok, list()} | {error, term()}. +process_insert(undefined, [], _Columns) -> {ok, []}; +process_insert([], [], _Columns) -> {ok, []}; +process_insert([PrepStmt | RestStmts], [{NonEmptyCols, Rows} | RestRows], Columns) -> + FilterRows = [Row#row{values = filter_columns(NonEmptyCols, Row#row.values)} || Row <- Rows], + case process_one_insert(PrepStmt, FilterRows, filter_columns(NonEmptyCols, Columns), Rows, Columns) of + {ok, ChangedKeys} -> + case process_insert(RestStmts, RestRows, Columns) of + {ok, RestChangedKeys} -> + {ok, ChangedKeys ++ RestChangedKeys}; + ErrorRest -> + ErrorRest + end; + Error -> + Error + end. + +-spec process_one_insert(term(), [#row{}], [#stmtCol{}], [#row{}], [#stmtCol{}]) -> {ok, list()} | {error, term()}. +process_one_insert(PrepStmt, FilterRows, FilterColumns, Rows, Columns) -> + RowsToInsert = [list_to_tuple(create_bind_vals(Row#row.values, FilterColumns)) || Row <- FilterRows], + case PrepStmt:exec_stmt(RowsToInsert, ?NoCommit) of + {rowids, RowIds} -> + if + length(RowIds) =:= length(RowsToInsert) -> + case inserted_changed_keys(RowIds, Rows, Columns) of + {error, ErrorMsg} -> + {error, ErrorMsg}; + ChangedKeys -> + {ok, ChangedKeys} + end; + true -> + %% TODO: What is a good message here ? + {error, <<"Error inserting the rows.">>} + end; + {error, {_ErrorCode, ErrorMsg}}-> + {error, ErrorMsg} + end. + +-spec split_changes(list()) -> {[#row{}], [#row{}], [#row{}]}. +split_changes(ChangeList) -> + split_changes(ChangeList, {[], [], []}). + +split_changes([], Result) -> Result; +split_changes([ListRow | ChangeList], Result) -> + [Pos, Op, Index | Values] = ListRow, + case Index of + {{}, {}} -> RowId = undefined; + {{}, Idx} -> RowId = element(tuple_size(Idx), Idx); + _ -> RowId = undefined + end, + Row = #row{index = Index, + id = RowId, + pos = Pos, + op = Op, + values = Values}, + NewResult = add_to_split_result(Row, Result), + split_changes(ChangeList, NewResult). + +%%TODO: Change for less verbose option setelement... +add_to_split_result(#row{op = del} = Row, {DeleteRows, UpdateRows, InsertRows}) -> + {[Row | DeleteRows], UpdateRows, InsertRows}; +add_to_split_result(#row{op = upd} = Row, {DeleteRows, UpdateRows, InsertRows}) -> + {DeleteRows, [Row | UpdateRows], InsertRows}; +add_to_split_result(#row{op = ins} = Row, {DeleteRows, UpdateRows, InsertRows}) -> + {DeleteRows, UpdateRows, [Row | InsertRows]}. + +filter_columns(ModifiedCols, Columns) -> + ModifiedColsList = tuple_to_list(ModifiedCols), + [lists:nth(ColIdx, Columns) || ColIdx <- ModifiedColsList]. + +create_upd_vars([#stmtCol{} = Col]) -> [Col#stmtCol.tag, " = :", "\"", Col#stmtCol.tag, "\""]; +create_upd_vars([#stmtCol{} = Col | Rest]) -> [Col#stmtCol.tag, " = :", "\"", Col#stmtCol.tag, "\"", ", ", create_upd_vars(Rest)]. + +create_bind_types([]) -> []; +create_bind_types([#stmtCol{} = Col | Rest]) -> + [{iolist_to_binary([":", "\"", Col#stmtCol.tag, "\""]), bind_types_map(Col#stmtCol.type)} | create_bind_types(Rest)]. + +create_ins_columns([#stmtCol{} = Col]) -> [Col#stmtCol.tag]; +create_ins_columns([#stmtCol{} = Col | Rest]) -> [Col#stmtCol.tag, ", ", create_ins_columns(Rest)]. + +create_ins_vars([#stmtCol{} = Col]) -> [":", "\"", Col#stmtCol.tag, "\""]; +create_ins_vars([#stmtCol{} = Col | Rest]) -> [":", "\"", Col#stmtCol.tag, "\"", ", ", create_ins_vars(Rest)]. + +create_changedkey_vals([], _Cols) -> []; +create_changedkey_vals([<<>> | Rest], [#stmtCol{} | RestCols]) -> + [<<>> | create_changedkey_vals(Rest, RestCols)]; +create_changedkey_vals([Value | Rest], [#stmtCol{type = 'SQLT_NUM', len = Scale, prec = dynamic} | RestCols]) -> + Number = imem_datatype:io_to_decimal(Value, undefined, Scale), + [Number | create_changedkey_vals(Rest, RestCols)]; +create_changedkey_vals([Value | Rest], [#stmtCol{type = Type, len = Len, prec = Prec} | RestCols]) -> + FormattedValue = case Type of + 'SQLT_DAT' -> dderloci_utils:dderltime_to_ora(Value); + 'SQLT_TIMESTAMP' -> dderloci_utils:dderlts_to_ora(Value); + 'SQLT_TIMESTAMP_TZ' -> dderloci_utils:dderltstz_to_ora(Value); + 'SQLT_NUM' -> imem_datatype:io_to_decimal(Value, Len, Prec); + 'SQLT_BIN' -> imem_datatype:binary_to_io(Value); + _ -> Value + end, + [FormattedValue | create_changedkey_vals(Rest, RestCols)]. + +create_bind_vals([], _Cols) -> []; +create_bind_vals([<<>> | Rest], [_Col | RestCols]) -> + [<<>> | create_bind_vals(Rest, RestCols)]; +create_bind_vals([Value | Rest], [#stmtCol{type = Type, len = Len} | RestCols]) -> + FormattedValue = case Type of + 'SQLT_DAT' -> dderloci_utils:dderltime_to_ora(Value); + 'SQLT_TIMESTAMP' -> dderloci_utils:dderlts_to_ora(Value); + 'SQLT_TIMESTAMP_TZ' -> dderloci_utils:dderltstz_to_ora(Value); + 'SQLT_NUM' -> dderloci_utils:oranumber_encode(Value); + 'SQLT_BIN' -> imem_datatype:io_to_binary(Value, Len); + _ -> Value + end, + [FormattedValue | create_bind_vals(Rest, RestCols)]. + +bind_types_map('SQLT_NUM') -> 'SQLT_VNU'; +%% There is no really support for this types at the moment so use string to send the data... +bind_types_map('SQLT_INT') -> 'SQLT_STR'; +bind_types_map('SQLT_FLT') -> 'SQLT_STR'; +bind_types_map('SQLT_CLOB') -> 'SQLT_STR'; +bind_types_map(Type) -> Type. + +-spec inserted_changed_keys([binary()], [#row{}], list()) -> [tuple()]. +inserted_changed_keys([], [], _) -> []; +inserted_changed_keys([RowId | RestRowIds], [Row | RestRows], Columns) -> + [{Row#row.pos, {{}, list_to_tuple(create_changedkey_vals(Row#row.values ++ [RowId], Columns ++ [#stmtCol{type = 'SQLT_STR'}]))}} | inserted_changed_keys(RestRowIds, RestRows, Columns)]; +inserted_changed_keys(_, _, _) -> + {error, <<"Invalid row keys returned by the oracle driver">>}. + +-spec split_by_columns_mod([#row{}], [#stmtCol{}], [{tuple(), [#row{}]}]) -> [{tuple(), [#row{}]}]. +split_by_columns_mod([], _Columns, Result) -> Result; +split_by_columns_mod([#row{} = Row | RestRows], Columns, Result) -> + case list_to_tuple(get_modified_cols(Row, Columns)) of + {} -> %% No changes in the row, nothing to do + NewResult = Result; + ModifiedCols -> + case proplists:get_value(ModifiedCols, Result) of + undefined -> + NewResult = [{ModifiedCols, [Row]} | Result]; + RowsSameCol -> + NewResult = lists:keyreplace(ModifiedCols, 1, Result, {ModifiedCols, [Row | RowsSameCol]}) + end + end, + split_by_columns_mod(RestRows, Columns, NewResult). + +-spec get_modified_cols(#row{}, [#stmtCol{}]) -> [integer()]. +get_modified_cols(#row{index = Index, values = Values}, Columns) -> + {{}, OriginalValuesTuple} = Index, + [_RowId | OriginalValuesR] = lists:reverse(tuple_to_list(OriginalValuesTuple)), + OriginalValues = lists:reverse(OriginalValuesR), + %% If we dont have rowid should be read only field. + LengthOrig = length(OriginalValues), + LengthOrig = length(Values), + get_modified_cols(OriginalValues, Values, Columns, 1). + +%% TODO: This should apply the same functions used by the rowfun to avoid repeating the code here again +%% null -> <<>> should be the default convertion . +-spec get_modified_cols([binary()], [binary()], [#stmtCol{}], pos_integer()) -> [integer()]. +get_modified_cols([], [], [], _) -> []; +get_modified_cols([_Orig | RestOrig], [_Value | RestValues], [#stmtCol{readonly=true} | Columns], Pos) -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); +get_modified_cols([<<>> | RestOrig], [<<>> | RestValues], [#stmtCol{} | Columns], Pos) -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); +get_modified_cols([<<>> | RestOrig], [_Value | RestValues], [#stmtCol{} | Columns], Pos) -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)]; +get_modified_cols([OrigVal | RestOrig], [Value | RestValues], [#stmtCol{type = 'SQLT_DAT'} | Columns], Pos) -> + case dderloci_utils:ora_to_dderltime(OrigVal) of + Value -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); + _ -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)] + end; +get_modified_cols([OrigVal | RestOrig], [Value | RestValues], [#stmtCol{type = 'SQLT_TIMESTAMP'} | Columns], Pos) -> + case dderloci_utils:ora_to_dderlts(OrigVal) of + Value -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); + _ -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)] + end; +get_modified_cols([OrigVal | RestOrig], [Value | RestValues], [#stmtCol{type = 'SQLT_TIMESTAMP_TZ'} | Columns], Pos) -> + case dderloci_utils:ora_to_dderltstz(OrigVal) of + Value -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); + _ -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)] + end; +get_modified_cols([null | RestOrig], [<<>> | RestValues], [#stmtCol{type = 'SQLT_NUM'} | Columns], Pos) -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); +get_modified_cols([null | RestOrig], [_Value | RestValues], [#stmtCol{type = 'SQLT_NUM'} | Columns], Pos) -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)]; +get_modified_cols([Mantissa | RestOrig], [Value | RestValues], [#stmtCol{type = 'SQLT_NUM', len = Scale, prec = dynamic} | Columns], Pos) -> + Number = dderloci_utils:clean_dynamic_prec(imem_datatype:decimal_to_io(Mantissa, Scale)), + if + Number =:= Value -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); + true -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)] + end; +get_modified_cols([Mantissa | RestOrig], [Value | RestValues], [#stmtCol{type = 'SQLT_NUM', prec = Prec} | Columns], Pos) -> + Number = imem_datatype:decimal_to_io(Mantissa, Prec), + if + Number =:= Value -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); + true -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)] + end; +get_modified_cols([OrigVal | RestOrig], [OrigVal | RestValues], [#stmtCol{} | Columns], Pos) -> + get_modified_cols(RestOrig, RestValues, Columns, Pos + 1); +get_modified_cols([_OrigVal | RestOrig], [_Value | RestValues], [#stmtCol{} | Columns], Pos) -> + [Pos | get_modified_cols(RestOrig, RestValues, Columns, Pos + 1)]. + +-spec split_by_non_empty([#row{}], [{tuple(),[#row{}]}]) -> [{tuple(), [#row{}]}]. +split_by_non_empty([], Result) -> Result; +split_by_non_empty([#row{values = Values} = Row | RestRows], Result) -> + NonEmptyCols = list_to_tuple(get_non_empty_cols(Values, 1)), + case proplists:get_value(NonEmptyCols, Result) of + undefined -> + NewResult = [{NonEmptyCols, [Row]} | Result]; + RowsSameCol -> + NewResult = lists:keyreplace(NonEmptyCols, 1, Result, {NonEmptyCols, [Row | RowsSameCol]}) + end, + split_by_non_empty(RestRows, NewResult). + +-spec get_non_empty_cols([binary()], pos_integer()) -> [integer()]. +get_non_empty_cols([], _) -> []; +get_non_empty_cols([<<>> | RestValues], Pos) -> + get_non_empty_cols(RestValues, Pos + 1); +get_non_empty_cols([_Value | RestValues], Pos) -> + [Pos | get_non_empty_cols(RestValues, Pos + 1)]. + +-spec close_stmts(term(), list() | undefined) -> ok. +close_stmts(_Conn, undefined) -> ok; +close_stmts(_Conn, []) -> ok; +close_stmts(Conn, [{del, Binds} | RestBinds]) -> + close_and_release_binds(Conn, Binds), + close_stmts(Conn, RestBinds); +close_stmts(Conn, [{upd, BindsList} | RestBinds]) -> + [close_and_release_binds(Conn, Binds) || Binds <- BindsList], + close_stmts(Conn, RestBinds); +close_stmts(Conn, [{ins, BindsList} | RestBinds]) -> + [close_and_release_binds(Conn, Binds) || Binds <- BindsList], + close_stmts(Conn, RestBinds); +close_stmts(Conn, [Binds | RestBinds]) -> + close_and_release_binds(Conn, Binds), + close_stmts(Conn, RestBinds). + +-spec close_and_release_binds(reference(), #bind{}) -> ok. +close_and_release_binds(Conn, #binds{stmt = Stmt, var = Var, data = Data}) -> + dderlodpi:dpi_stmt_close(Conn, Stmt), + dderlodpi:dpi_var_release(Conn, Var), + dderlodpi:dpi_data_release(Conn, Data). + +-spec check_rowid([binary()], [#row{}]) -> true | false | long_rowid. +check_rowid(RowIds, Rows) when length(RowIds) =:= length(Rows) -> + check_member(RowIds, Rows); +check_rowid(_RowIds, _Rows) -> + false. + +-spec check_member([binary()], [#row{}]) -> true | false | long_rowid. +check_member(_, []) -> true; +check_member(RowIds, [Row | RestRows]) -> + case lists:member(Row#row.id, RowIds) of + true -> + check_member(RowIds, RestRows); + false -> + case size(Row#row.id) > 19 of + true -> long_rowid; + false -> false + end + end. diff --git a/src/odpi_adapter.erl b/src/odpi_adapter.erl index 376dbf06..bd765810 100644 --- a/src/odpi_adapter.erl +++ b/src/odpi_adapter.erl @@ -897,11 +897,11 @@ generate_fsmctx(#stmtResult{ ,update_cursor_prepare_fun = fun(ChangeList) -> ?Debug("The stmtref ~p, the table name: ~p and the change list: ~n~p", [StmtRef, TableName, ChangeList]), - dderloci_stmt:prepare(TableName, ChangeList, Connection, Clms) + dderlodpi_stmt:prepare(TableName, ChangeList, Connection, Clms) end ,update_cursor_execute_fun = fun(_Lock, PrepStmt) -> - Result = dderloci_stmt:execute(PrepStmt), + Result = dderlodpi_stmt:execute(PrepStmt), ?Debug("The result from the exec ~p", [Result]), Result end