Skip to content

Commit

Permalink
On checkpoint wait till corresponding WAL archived into S3
Browse files Browse the repository at this point in the history
  • Loading branch information
akorotkov committed May 13, 2024
1 parent bd4dde0 commit df9e21b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 9 deletions.
2 changes: 1 addition & 1 deletion include/s3/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
#ifndef __S3_CHECKPOINT_H__
#define __S3_CHECKPOINT_H__

extern void s3_perform_backup(S3TaskLocation location);
extern void s3_perform_backup(int flags, S3TaskLocation location);

#endif /* __S3_CHECKPOINT_H__ */
107 changes: 106 additions & 1 deletion src/checkpoint/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include "utils/stopevent.h"
#include "utils/ucm.h"

#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "common/hashfn.h"
#include "miscadmin.h"
#include "pgstat.h"
Expand Down Expand Up @@ -1308,7 +1310,7 @@ o_perform_checkpoint(XLogRecPtr redo_pos, int flags)
checkpoint_state->lastCheckpointNumber);

if (orioledb_s3_mode)
s3_perform_backup(maxLocation);
s3_perform_backup(flags, maxLocation);

if (next_CheckPoint_hook)
next_CheckPoint_hook(redo_pos, flags);
Expand Down Expand Up @@ -1423,16 +1425,119 @@ checkpoint_temporary_tree(int flags, BTreeDescr *descr)
chkp_inc_changecount_after(checkpoint_state);
}

/*
* Same as XLogArchiveCheckDone(), but without notifying archiver.
*/
static bool
check_archive_done(const char *xlog)
{
char archiveStatusPath[MAXPGPATH];
struct stat stat_buf;

/* The file is always deletable if archive_mode is "off". */
if (!XLogArchivingActive())
return true;

/*
* During archive recovery, the file is deletable if archive_mode is not
* "always".
*/
if (!XLogArchivingAlways() &&
GetRecoveryState() == RECOVERY_STATE_ARCHIVE)
return true;

/*
* At this point of the logic, note that we are either a primary with
* archive_mode set to "on" or "always", or a standby with archive_mode
* set to "always".
*/

/* First check for .done --- this means archiver is done with it */
StatusFilePath(archiveStatusPath, xlog, ".done");
if (stat(archiveStatusPath, &stat_buf) == 0)
return true;

return false;
}

static S3TaskLocation
after_checkpoint_sync_wal(void)
{
DIR *xldir;
struct dirent *xlde;
S3TaskLocation maxLocation = 0;
S3TaskLocation location;

xldir = AllocateDir(XLOGDIR);

while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
{
/* Ignore files that are not XLOG segments */
if (!IsXLogFileName(xlde->d_name))
continue;

/* Ignore already arhieved files */
if (check_archive_done(xlde->d_name))
continue;

location = s3_schedule_wal_file_write(xlde->d_name);
maxLocation = Max(location, maxLocation);
}

FreeDir(xldir);
return maxLocation;
}

void
o_after_checkpoint_cleanup_hook(XLogRecPtr checkPointRedo, int flags)
{
S3TaskLocation maxLocation = 0;
S3TaskLocation location;
uint32 chkpNum = checkpoint_state->lastCheckpointNumber;

/* called at the end of StartupXLOG */
*was_in_recovery = flags == 0;

if (!(flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY)))
{
o_sys_caches_delete_by_lsn(checkPointRedo);
}

if (!orioledb_s3_mode)
return;

if (GetRecoveryState() != RECOVERY_STATE_DONE)
return;

if (XLogInsertAllowed())
{
XLogRecPtr switchpoint;
XLogSegNo xlogsegno;
char xlogfilename[MAXFNAMELEN];

/*
* Wait till archiver finishes with the checkpoint record.
*/
switchpoint = RequestXLogSwitch(false);
XLByteToPrevSeg(switchpoint, xlogsegno, wal_segment_size);
XLogFileName(xlogfilename, 1,
xlogsegno, wal_segment_size);
while (!check_archive_done(xlogfilename))
{
pg_usleep(100000L);
}
}
else
{
location = after_checkpoint_sync_wal();
maxLocation = Max(maxLocation, location);
}

location = s3_schedule_file_write(chkpNum, XLOG_CONTROL_FILE, false);
maxLocation = Max(maxLocation, location);
location = s3_schedule_file_write(chkpNum, ORIOLEDB_DATA_DIR "/control", false);
maxLocation = Max(maxLocation, location);
s3_queue_wait_for_location(maxLocation);
}

/*
Expand Down
10 changes: 3 additions & 7 deletions src/s3/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <time.h>

#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogbackup.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
Expand Down Expand Up @@ -220,7 +221,7 @@ static List *get_tablespaces(StringInfo tblspcmapfile);
* clobbered by longjmp" from stupider versions of gcc.
*/
void
s3_perform_backup(S3TaskLocation maxLocation)
s3_perform_backup(int flags, S3TaskLocation maxLocation)
{
uint32 chkpNum = checkpoint_state->lastCheckpointNumber;
S3BackupState state;
Expand Down Expand Up @@ -260,16 +261,12 @@ s3_perform_backup(S3TaskLocation maxLocation)
/* Then the bulk of the files... */
s3_backup_scan_dir(&state, ".", 1, NULL);

location = s3_schedule_file_write(chkpNum, XLOG_CONTROL_FILE, false);
maxLocation = Max(maxLocation, location);
location = s3_schedule_file_write(chkpNum, ORIOLEDB_DATA_DIR "/control", false);
maxLocation = Max(maxLocation, location);

xidFilename = psprintf(XID_FILENAME_FORMAT,
checkpoint_state->lastCheckpointNumber);
location = s3_schedule_file_write(chkpNum, xidFilename, false);
maxLocation = Max(maxLocation, location);
pfree(xidFilename);

}
else
{
Expand All @@ -283,7 +280,6 @@ s3_perform_backup(S3TaskLocation maxLocation)
location = flush_small_files(&state);
maxLocation = Max(maxLocation, location);


pfree(tablespaceMapData.data);
s3_queue_wait_for_location(maxLocation);
}
Expand Down

0 comments on commit df9e21b

Please sign in to comment.