Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileCacheQueueScheduler使用BloomFilter进行去重 #1176

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Spider scheduler(Scheduler scheduler) {
*/
public Spider setScheduler(Scheduler updateScheduler) {
checkIfRunning();
SpiderScheduler oldScheduler = this.scheduler;
Scheduler oldScheduler = scheduler.getScheduler();
scheduler.setScheduler(updateScheduler);
if (oldScheduler != null) {
Request request;
Expand Down Expand Up @@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}

private void onDownloaderFail(Request request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
package us.codecraft.webmagic.scheduler;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;

import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand All @@ -32,7 +16,7 @@
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable {

private String filePath = System.getProperty("java.io.tmpdir");

Expand All @@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement

private BlockingQueue<Request> queue;

private Set<String> urls;

private ScheduledExecutorService flushThreadPool;

public FileCacheQueueScheduler(String filePath) {
Expand Down Expand Up @@ -83,36 +65,13 @@ private void init(Task task) {
}

private void initDuplicateRemover() {
setDuplicateRemover(
new DuplicateRemover() {
@Override
public boolean isDuplicate(Request request, Task task) {
if (!inited.get()) {
init(task);
}
return !urls.add(request.getUrl());
}

@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}

@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
});
BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode());
setDuplicateRemover(bloomFilterDuplicateRemover);
}

private void initFlushThread() {
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
flush();
}
}, 10, 10, TimeUnit.SECONDS);
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);
}

private void initWriter() {
Expand All @@ -127,7 +86,6 @@ private void initWriter() {
private void readFile() {
try {
queue = new LinkedBlockingQueue<Request>();
urls = new LinkedHashSet<String>();
readCursorFile();
readUrlFile();
// initDuplicateRemover();
Expand All @@ -140,46 +98,43 @@ private void readFile() {
}

private void readUrlFile() throws IOException {
String line;
BufferedReader fileUrlReader = null;
try {
fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) {
String line;
int lineReaded = 0;
while ((line = fileUrlReader.readLine()) != null) {
urls.add(line.trim());
Request request = deserializeRequest(line);
this.getDuplicateRemover().isDuplicate(request, null);
lineReaded++;
if (lineReaded > cursor.get()) {
queue.add(deserializeRequest(line));
queue.add(request);
}
}
} finally {
if (fileUrlReader != null) {
IOUtils.closeQuietly(fileUrlReader);
}
}
}

private void readCursorFile() throws IOException {
BufferedReader fileCursorReader = null;
try {
fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
String fileName = getFileName(fileCursor);
try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) {
String line;
String lastLine = null;
//read the last number
while ((line = fileCursorReader.readLine()) != null) {
cursor = new AtomicInteger(NumberUtils.toInt(line));
line = line.trim();
if (!line.isEmpty()) {
lastLine = line;
}
}
} finally {
if (fileCursorReader != null) {
IOUtils.closeQuietly(fileCursorReader);
if (lastLine != null) {
cursor.set(NumberUtils.toInt(line));
}
}
}

public void close() throws IOException {
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}

private String getFileName(String filename) {
return filePath + task.getUUID() + filename;
Expand Down