Skip to content

Commit

Permalink
FileCacheQueueScheduler使用BloomFilter进行去重 (#1176)
Browse files Browse the repository at this point in the history
Co-authored-by: xiezc <blanexie@qq.com>
  • Loading branch information
blanexie and xiezc authored Aug 19, 2024
1 parent 2c135da commit 15ec80f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}

private void onDownloaderFail(Request request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
package us.codecraft.webmagic.scheduler;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;

import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand All @@ -32,7 +16,7 @@
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable {

private String filePath = System.getProperty("java.io.tmpdir");

Expand All @@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement

private BlockingQueue<Request> queue;

private Set<String> urls;

private ScheduledExecutorService flushThreadPool;

public FileCacheQueueScheduler(String filePath) {
Expand Down Expand Up @@ -83,36 +65,13 @@ private void init(Task task) {
}

private void initDuplicateRemover() {
setDuplicateRemover(
new DuplicateRemover() {
@Override
public boolean isDuplicate(Request request, Task task) {
if (!inited.get()) {
init(task);
}
return !urls.add(request.getUrl());
}

@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}

@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
});
BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode());
setDuplicateRemover(bloomFilterDuplicateRemover);
}

private void initFlushThread() {
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
flush();
}
}, 10, 10, TimeUnit.SECONDS);
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);
}

private void initWriter() {
Expand All @@ -127,7 +86,6 @@ private void initWriter() {
private void readFile() {
try {
queue = new LinkedBlockingQueue<Request>();
urls = new LinkedHashSet<String>();
readCursorFile();
readUrlFile();
// initDuplicateRemover();
Expand All @@ -140,46 +98,43 @@ private void readFile() {
}

private void readUrlFile() throws IOException {
String line;
BufferedReader fileUrlReader = null;
try {
fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) {
String line;
int lineReaded = 0;
while ((line = fileUrlReader.readLine()) != null) {
urls.add(line.trim());
Request request = deserializeRequest(line);
this.getDuplicateRemover().isDuplicate(request, null);
lineReaded++;
if (lineReaded > cursor.get()) {
queue.add(deserializeRequest(line));
queue.add(request);
}
}
} finally {
if (fileUrlReader != null) {
IOUtils.closeQuietly(fileUrlReader);
}
}
}

private void readCursorFile() throws IOException {
BufferedReader fileCursorReader = null;
try {
fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
String fileName = getFileName(fileCursor);
try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) {
String line;
String lastLine = null;
//read the last number
while ((line = fileCursorReader.readLine()) != null) {
cursor = new AtomicInteger(NumberUtils.toInt(line));
line = line.trim();
if (!line.isEmpty()) {
lastLine = line;
}
}
} finally {
if (fileCursorReader != null) {
IOUtils.closeQuietly(fileCursorReader);
if (lastLine != null) {
cursor.set(NumberUtils.toInt(line));
}
}
}

public void close() throws IOException {
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}

private String getFileName(String filename) {
return filePath + task.getUUID() + filename;
Expand Down

0 comments on commit 15ec80f

Please sign in to comment.