Skip to content

Commit

Permalink
Made DiskWriterQueue more reliable so that it can handle multiple bac…
Browse files Browse the repository at this point in the history
…k calls better (litedb-org#2126)

* Made DiskWriterQueue more reliable so that it can handle multiple back-to-back calls better.

Co-authored-by: Kaan Kaya <kaan.kaya@nice.com>
  • Loading branch information
kaan-kaya and Kaan Kaya authored Dec 29, 2021
1 parent c5db78e commit 431be5f
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ internal class DiskWriterQueue : IDisposable

private ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private int _running = 0;

public DiskWriterQueue(Stream stream)
{
_stream = stream;
Expand Down Expand Up @@ -54,10 +56,15 @@ public void Run()
{
lock (_queue)
{
if (_queue.Count > 0 && (_task == null || _task.IsCompleted))
if (_queue.Count == 0) return;

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 0)
{
// Schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(this.ExecuteQueue);
_task = Task.Run(ExecuteQueue);
}
}
}
Expand All @@ -74,10 +81,7 @@ public void Wait()
_task.Wait();
}

if (_queue.Count > 0)
{
this.ExecuteQueue();
}
Run();
}

ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
Expand All @@ -90,32 +94,50 @@ private void ExecuteQueue()
{
if (_queue.Count == 0) return;

var count = 0;

try
do
{
while (_queue.TryDequeue(out var page))
if (_queue.TryDequeue(out var page))
{
ENSURE(page.ShareCounter > 0, "page must be shared at least 1");
WritePageToStream(page);
}

// set stream position according to page
_stream.Position = page.Position;
while (page == null)
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

_stream.Write(page.Array, page.Offset, PAGE_SIZE);
if (!_queue.Any()) return;

// release page here (no page use after this)
page.Release();
// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

count++;
if (oldValue == 1)
{
// A new thread was already scheduled for execution, this thread can return.
return;
}

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
}

// after this I will have 100% sure data are safe on log file
_stream.FlushToDisk();
}
catch (IOException)
{
//TODO: notify database to stop working (throw error in all operations)
}
} while (true);
}

private void WritePageToStream(PageBuffer page)
{
if (page == null) return;

ENSURE(page.ShareCounter > 0, "page must be shared at least 1");

// set stream position according to page
_stream.Position = page.Position;

_stream.Write(page.Array, page.Offset, PAGE_SIZE);

// release page here (no page use after this)
page.Release();
}

public void Dispose()
Expand Down

0 comments on commit 431be5f

Please sign in to comment.