Skip to content

Commit

Permalink
Allow transaction manager to register a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
erick576 authored and highker committed Jun 20, 2022
1 parent 49311b0 commit e6f0a70
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public List<TransactionInfo> getAllTransactionInfos()
return delegate.getAllTransactionInfos();
}

@Override
public void tryRegisterTransaction(TransactionInfo transactionInfo)
{
delegate.tryRegisterTransaction(transactionInfo);
}

@Override
public TransactionId beginTransaction(boolean autoCommitContext)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ public class InMemoryTransactionManager
private final Map<String, FunctionNamespaceManager<?>> functionNamespaceManagers = new HashMap<>();
private final Map<String, String> companionCatalogs;

private InMemoryTransactionManager(Duration idleTimeout, int maxFinishingConcurrency, CatalogManager catalogManager, Executor finishingExecutor, Map<String, String> companionCatalogs)
private InMemoryTransactionManager(
Duration idleTimeout,
int maxFinishingConcurrency,
CatalogManager catalogManager,
Executor finishingExecutor,
Map<String, String> companionCatalogs)
{
this.catalogManager = catalogManager;
requireNonNull(idleTimeout, "idleTimeout is null");
Expand All @@ -113,7 +118,12 @@ public static TransactionManager create(
CatalogManager catalogManager,
ExecutorService finishingExecutor)
{
InMemoryTransactionManager transactionManager = new InMemoryTransactionManager(config.getIdleTimeout(), config.getMaxFinishingConcurrency(), catalogManager, finishingExecutor, config.getCompanionCatalogs());
InMemoryTransactionManager transactionManager = new InMemoryTransactionManager(
config.getIdleTimeout(),
config.getMaxFinishingConcurrency(),
catalogManager,
finishingExecutor,
config.getCompanionCatalogs());
transactionManager.scheduleIdleChecks(config.getIdleCheckInterval(), idleCheckExecutor);
return transactionManager;
}
Expand Down Expand Up @@ -174,6 +184,16 @@ public List<TransactionInfo> getAllTransactionInfos()
.collect(toImmutableList());
}

@Override
public void tryRegisterTransaction(TransactionInfo transactionInfo)
{
TransactionId transactionId = transactionInfo.getTransactionId();
if (transactions.containsKey(transactionId)) {
return;
}
registerTransaction(transactionId, transactionInfo.getIsolationLevel(), transactionInfo.isReadOnly(), transactionInfo.isAutoCommitContext());
}

@Override
public TransactionId beginTransaction(boolean autoCommitContext)
{
Expand All @@ -184,9 +204,7 @@ public TransactionId beginTransaction(boolean autoCommitContext)
public TransactionId beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext)
{
TransactionId transactionId = TransactionId.create();
BoundedExecutor executor = new BoundedExecutor(finishingExecutor, maxFinishingConcurrency);
TransactionMetadata transactionMetadata = new TransactionMetadata(transactionId, isolationLevel, readOnly, autoCommitContext, catalogManager, executor, functionNamespaceManagers, companionCatalogs);
checkState(transactions.put(transactionId, transactionMetadata) == null, "Duplicate transaction ID: %s", transactionId);
registerTransaction(transactionId, isolationLevel, readOnly, autoCommitContext);
return transactionId;
}

Expand Down Expand Up @@ -283,6 +301,21 @@ private TransactionMetadata getTransactionMetadata(TransactionId transactionId)
return transactionMetadata;
}

private void registerTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext)
{
BoundedExecutor executor = new BoundedExecutor(finishingExecutor, maxFinishingConcurrency);
TransactionMetadata transactionMetadata = new TransactionMetadata(
transactionId,
isolationLevel,
readOnly,
autoCommitContext,
catalogManager,
executor,
functionNamespaceManagers,
companionCatalogs);
checkState(transactions.put(transactionId, transactionMetadata) == null, "Duplicate transaction ID: %s", transactionId);
}

private Optional<TransactionMetadata> tryGetTransactionMetadata(TransactionId transactionId)
{
return Optional.ofNullable(transactions.get(transactionId));
Expand Down Expand Up @@ -427,7 +460,9 @@ private synchronized Optional<ConnectorId> getConnectorId(String catalogName)

if (companionCatalogs.containsKey(catalogName)) {
Optional<Catalog> companionCatalog = catalogManager.getCatalog(companionCatalogs.get(catalogName));
checkArgument(companionCatalog.isPresent(), format("Invalid config, no catalog exists for catalog name %s: %s", catalogName, companionCatalogs.get(catalogName)));
checkArgument(
companionCatalog.isPresent(),
format("Invalid config, no catalog exists for catalog name %s: %s", catalogName, companionCatalogs.get(catalogName)));
registerCatalog(companionCatalog.get());
}
}
Expand Down Expand Up @@ -570,7 +605,10 @@ public synchronized ListenableFuture<?> asyncCommit()
// for transactions with read and write, we only return the commit handle for write query.
ConnectorTransactionMetadata writeConnector = connectorIdToMetadata.get(writeConnectorId);
Supplier<ListenableFuture<?>> commitFunctionNamespaceTransactions = () -> functionNamespaceFuture;
ListenableFuture<?> readOnlyCommitFuture = Futures.transformAsync(commitFunctionNamespaceTransactions.get(), ignored -> commitReadOnlyConnectors.get(), directExecutor());
ListenableFuture<?> readOnlyCommitFuture = Futures.transformAsync(
commitFunctionNamespaceTransactions.get(),
ignored -> commitReadOnlyConnectors.get(),
directExecutor());
ListenableFuture<?> writeCommitFuture = Futures.transformAsync(readOnlyCommitFuture, ignored -> finishingExecutor.submit(writeConnector::commit), directExecutor());
addExceptionCallback(writeCommitFuture, this::abortInternal);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public List<TransactionInfo> getAllTransactionInfos()
throw new UnsupportedOperationException();
}

@Override
public void tryRegisterTransaction(TransactionInfo transactionInfo)
{
throw new UnsupportedOperationException();
}

@Override
public TransactionId beginTransaction(boolean autoCommitContext)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface TransactionManager

List<TransactionInfo> getAllTransactionInfos();

void tryRegisterTransaction(TransactionInfo transactionInfo);

TransactionId beginTransaction(boolean autoCommitContext);

TransactionId beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommitContext);
Expand Down

0 comments on commit e6f0a70

Please sign in to comment.