Skip to content

Commit

Permalink
added code to verify top k using baseline/single lookup approach
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhaa committed Jul 28, 2016
1 parent 147d8bc commit baa02a3
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 75 deletions.
Binary file added Analysis/Caida/Caida23MAggVerification.xlsx
Binary file not shown.
Binary file not shown.
Binary file not shown.
31 changes: 23 additions & 8 deletions code/AggregateModelVerifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,45 @@ public static void runExperiment(SummaryStructureType type, ArrayList<FlowWithCo
float occupiedSlots[] = new float[k.length];
float duplicates[] = new float[k.length]; // number of duplicate entries in the table
double cumDeviation[] = new double[k.length]; // cumulative deviation in reported size across all flows
int nonHHCompetitors[] = new int[D + 1]; // tracks number of HH competitors each flow competes against on average

// track the unique heavy hitters
DLeftHashTable lostFlowHashTable = null;
HashMap<Long, Long> observedHH = new HashMap<Long, Long>();

// index at which k for comparing number of heavy hitters is at
int comp_index = 0;

for (int t = 0; t < numberOfTrials; t++){
Collections.shuffle(inputStream);

if (t < 2){
/*if (t < 2){
for (int i = 0; i < 15; i++)
System.err.println(inputStream.get(i).flowid + "," + inputStream.get(i).count);
System.err.println("new trial");
}
}*/

lostFlowHashTable = new DLeftHashTable(tableSize, type, inputStream.size(), D);

// given input, so ideal order of heavy hitters
FlowWithCount[] inputStreamArray = new FlowWithCount[inputStream.size()];
inputStreamArray = inputStream.toArray(inputStreamArray);
Arrays.sort(inputStreamArray);

// first k in inputStream are expected hh - fix which heavy hitters you look at for cdf of competitors
expectedHH = new HashSet<Long>();
for (int i = 0; i < k[comp_index]; i++){
expectedHH.add(inputStreamArray[i].flowid);
}

int count = 0;
for (FlowWithCount f : inputStream){
lostFlowHashTable.processAggData(f.flowid, count++, f.count);
lostFlowHashTable.processAggData(f.flowid, count++, f.count, nonHHCompetitors, expectedHH);
}

// observed flows in sorted order so that we can pick the hh as the top k
FlowWithCount[] outputFlowBuckets = Arrays.copyOf(lostFlowHashTable.getBuckets(), lostFlowHashTable.getBuckets().length);
Arrays.sort(outputFlowBuckets);

// given input, so ideal order of heavy hitters
FlowWithCount[] inputStreamArray = new FlowWithCount[inputStream.size()];
inputStreamArray = inputStream.toArray(inputStreamArray);
Arrays.sort(inputStreamArray);

cumDroppedPacketInfoCount += lostFlowHashTable.getDroppedPacketInfoCount();

Expand Down Expand Up @@ -165,6 +175,11 @@ public static void runExperiment(SummaryStructureType type, ArrayList<FlowWithCo
System.out.print(expectedSize[k_index] + "," + observedSize[k_index] + "," + (double) hhPacketsReported[k_index]/hhPacketCount[k_index]);
System.out.println("," + cumDeviation[k_index]/numberOfTrials + "," + numberOfFalsePositives[k_index] + "," + numberOfFalseNegatives[k_index]);
}

System.err.print(tableSize + "," + k[comp_index] + "," + D + ",");
for (int j = 0; j <= D; j++)
System.err.print(nonHHCompetitors[j]/numberOfTrials + ",");
System.err.println();
}

public static void main(String[] args){
Expand Down
11 changes: 9 additions & 2 deletions code/DLeftHashTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ else if (type == SummaryStructureType.EvictionWithCount)
}
}

public void processAggData(long key, int keynum, long value){
public void processAggData(long key, int keynum, long value, int[] nonHHCompetitors, HashSet<Long> expectedHH){
// hardcoded values for the hash functions given that the number of flows is 100
final int P = 5171;
final int hashA[] = { 421, 199, 79, 83, 89, 97, 101, 103, 107, 109, 113,
Expand Down Expand Up @@ -546,6 +546,8 @@ public void processAggData(long key, int keynum, long value){
int k = 0;
int firstLocation = 0; // how to track this in hardware

int currentCompetitors = 0; // variable to track the current number of heavy hitter competitors the current packet has seen

if (key == 0)
{
System.out.print("invalid Key");
Expand All @@ -572,6 +574,9 @@ public void processAggData(long key, int keynum, long value){
break;
}

if (expectedHH.contains(buckets[index].flowid))
currentCompetitors++;

// track min - first time explicitly set the value
if (buckets[index].count < minValue || k == 0){
minValue = buckets[index].count;
Expand All @@ -580,6 +585,8 @@ public void processAggData(long key, int keynum, long value){
}
}

nonHHCompetitors[currentCompetitors] += 1;

boolean isAggregateData = true;
// none of the D locations were free
if (k == D) {
Expand Down Expand Up @@ -633,7 +640,7 @@ public void basicHeuristic(int minIndex, long key, boolean isAggregateData, long
else {
droppedPacketInfoCount = droppedPacketInfoCount + (int) buckets[minIndex].count;
buckets[minIndex].flowid = key;
buckets[minIndex].count = 1;
buckets[minIndex].count = 1; // replace with min+1
}
}

Expand Down
94 changes: 66 additions & 28 deletions code/LossyFlowIdentifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class LossyFlowIdentifier{
private static HashSet<Long> expectedHH;
private static HashMap<Long, Integer> flowSizes;
private static ArrayList<String> flowsToBeLost;

private static double accuracy = 0.99;
/*public static PriorityQueue<LossyFlow> HeapOfLossyFlows;
private class BucketMatrixIndex{
int hashfunctionIndex;
Expand Down Expand Up @@ -67,6 +69,8 @@ public static void runLossIdentificationTrials(SummaryStructureType type, Sketch
else
lostFlowHashTable = new DLeftHashTable(tableSize, type, lostPacketStream.size(), D);

Collections.shuffle(lostPacketStream); // randomizing the order

int count = 0;
for (Packet p : lostPacketStream){
//lostPacketSketch.updateCountInSketch(p);
Expand Down Expand Up @@ -427,7 +431,7 @@ public static void runSizeDifferenceMeasurementOnSketch(SummaryStructureType typ
}
}

public static void runTrialsOnSketch(SummaryStructureType type, ArrayList<Packet> lostPacketStream, double[] threshold, int totalMemory, int D, long thr_totalPackets){
public static void runTrialsPerThreshold(SummaryStructureType type, ArrayList<Packet> lostPacketStream, double[] threshold, int totalMemory, int D, long thr_totalPackets){
int numberOfTrials = 1000;
int observedSize[] = new int[threshold.length];
int expectedSize[] = new int[threshold.length];
Expand Down Expand Up @@ -461,10 +465,14 @@ public static void runTrialsOnSketch(SummaryStructureType type, ArrayList<Packet
HashMap<Long, Long> observedHH;
HashMap<Long, Long> observedHHfromDump;
for (int t = 0; t < numberOfTrials; t++){

Collections.shuffle(lostPacketStream);

for (int thr_index = 0; thr_index < threshold.length; thr_index++){
// find the expected HH in the idealistic 100% accuracy case
expectedHH = new HashSet<Long>();
observedHHfromDump = new HashMap<Long, Long>();
observedHH = new HashMap<Long, Long>();
for (String f : flowsToBeLost){
if (flowSizes.get(FlowDataParser.convertAddressToLong(f)) > (int) (threshold[thr_index] * lostPacketStream.size())){
expectedHH.add(FlowDataParser.convertAddressToLong(f));
Expand All @@ -475,47 +483,68 @@ public static void runTrialsOnSketch(SummaryStructureType type, ArrayList<Packet

//System.out.println("cacheSize" + cacheSize);
// track the unique lost flows
CountMinWithCache cmsketch = new CountMinWithCache(totalMemory, type, lostPacketStream.size(), D, cacheSize[thr_index], threshold[thr_index]);
CountMinWithCache cmsketch = null;
SampleAndHold flowMemoryFromSampling = null;
double thresholdCount = lostPacketStream.size() * threshold[thr_index];
double samplingProb = (1 - Math.pow(1 - accuracy, 1/thresholdCount));

if (type == SummaryStructureType.SampleAndHold)
flowMemoryFromSampling = new SampleAndHold(totalMemory, type, lostPacketStream.size(), samplingProb);
else
cmsketch = new CountMinWithCache(totalMemory, type, lostPacketStream.size(), D, cacheSize[thr_index], threshold[thr_index]);

for (Packet p : lostPacketStream){
cmsketch.processData(p.getSrcIp(), thr_totalPackets);
if (type == SummaryStructureType.SampleAndHold)
flowMemoryFromSampling.processData(p.getSrcIp());
else
cmsketch.processData(p.getSrcIp(), thr_totalPackets);
}

// get the heavy hitters from a dump of the cache and track them separately
if (type == SummaryStructureType.CountMinCacheWithKeys){
for (FlowWithCount f : cmsketch.getCache()){
//System.out.println(f.flowid + " " + f.count);
if (f.count > threshold[thr_index]*lostPacketStream.size()){
observedHHfromDump.put(f.flowid, f.count);
observedHHfromDump.put(f.flowid, f.count);
}
}
}
observedSizeFromDump[thr_index] = observedHHfromDump.size();


//get the heavy hitters and clean them up
observedHH = cmsketch.getHeavyHitters();/* new HashMap<Long, Long>();*/
//System.out.print("Before cleaning:" + cmsketch.getHeavyHitters().size());
ArrayList<Long> flowsToRemove = new ArrayList<Long>();
for (long flowid : cmsketch.getHeavyHitters().keySet()) {
if (type == SummaryStructureType.CountMinCacheNoKeys && cmsketch.getHeavyHitters().get(flowid) > threshold[thr_index]*lostPacketStream.size())
observedHH.put(flowid, cmsketch.getHeavyHitters().get(flowid));
if (type == SummaryStructureType.CountMinCacheWithKeys && observedHH.get(flowid) <= threshold[thr_index]*lostPacketStream.size()){
// check if the cache has a mre updated value that would account for this particular flowid being a hh
// you would technically hash on this flowid and look up that index -- eliminated that part
if (!observedHHfromDump.containsKey(flowid))
flowsToRemove.add(flowid);
else if (observedHHfromDump.get(flowid) <= threshold[thr_index]*lostPacketStream.size())
flowsToRemove.add(flowid);
// get the heavy hitters from the sample and hold flow memory
if (type == SummaryStructureType.SampleAndHold){
cacheSize[thr_index] = flowMemoryFromSampling.getBuckets().size();
for (Long f : flowMemoryFromSampling.getBuckets().keySet()){
if (flowMemoryFromSampling.getBuckets().get(f) > threshold[thr_index]*lostPacketStream.size())
observedHH.put(f, flowMemoryFromSampling.getBuckets().get(f));
}
}
else {
//get the heavy hitters and clean them up
observedHH = cmsketch.getHeavyHitters();
ArrayList<Long> flowsToRemove = new ArrayList<Long>();
for (long flowid : cmsketch.getHeavyHitters().keySet()) {
if (type == SummaryStructureType.CountMinCacheNoKeys && cmsketch.getHeavyHitters().get(flowid) > threshold[thr_index]*lostPacketStream.size())
observedHH.put(flowid, cmsketch.getHeavyHitters().get(flowid));
if (type == SummaryStructureType.CountMinCacheWithKeys && observedHH.get(flowid) <= threshold[thr_index]*lostPacketStream.size()){
// check if the cache has a mre updated value that would account for this particular flowid being a hh
// you would technically hash on this flowid and look up that index -- eliminated that part
if (!observedHHfromDump.containsKey(flowid))
flowsToRemove.add(flowid);
else if (observedHHfromDump.get(flowid) <= threshold[thr_index]*lostPacketStream.size())
flowsToRemove.add(flowid);
}
}
for (long flowid : flowsToRemove)
observedHH.remove(flowid);
//System.out.println("after cleaning: " + observedHH.size());
}
for (long flowid : flowsToRemove)
observedHH.remove(flowid);
//System.out.println("after cleaning: " + observedHH.size());

observedSize[thr_index] = observedHH.size();
occupancy[thr_index] += (float) cmsketch.getSketch().getOccupancy();
controllerReportCount[thr_index] += (float) cmsketch.getControllerReports();
if (type != SummaryStructureType.SampleAndHold) {
occupancy[thr_index] += (float) cmsketch.getSketch().getOccupancy();
controllerReportCount[thr_index] += (float) cmsketch.getControllerReports();
}

int bigLoserPacketsLost = 0;
int flag = 0;
Expand Down Expand Up @@ -717,26 +746,35 @@ else if (args[3].contains("coalesce"))
//}
}
}
else if (args[2].equals("countMin")){
else if (args[2].equals("PerThreshold")){
System.out.print("totalMemory," + "cacheSize," + "threshold," + "D," + "FalsePositive %," + "False Negative %," + "expected number, reported number, hhReportedFraction, deviation, table occupancy, thr_totalPackets, Controlleer Report Count");
if (args[3].contains("Keys"))
System.out.print("FalsePositiveinDump %," + "False Negativ in Dump %," + "expected number, reported number in dump, hhReportedFraction in dump, deviation in dump,");
System.out.println();
int tempCount = 0;

for (int tableSize_index = 0; tableSize_index < tableSize.length; tableSize_index++) {
//for (long thr_totalPackets = 100000; thr_totalPackets <= 500000; thr_totalPackets += 100000)
if (tempCount != 0)
continue;

if (args[3].contains("SampleAndHold")) {
runTrialsPerThreshold(SummaryStructureType.SampleAndHold, lostPacketStream, threshold, tableSize[tableSize_index], 0, 0);
tempCount++;
continue;
}
for (long thr_totalPackets = 0; thr_totalPackets <= 0; thr_totalPackets += 100000){
for (int D = 3; D <= 3; D++){
//System.out.println(expectedHH.size() + " " + totalPacketsLost);
//System.out.println(totalPacketsLost + " " + count);

// run the loss identification trials for the appropriate heuristic
if (args[3].contains("NoKeyNoRepBit"))
runTrialsOnSketch(SummaryStructureType.CountMinCacheNoKeys, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
runTrialsPerThreshold(SummaryStructureType.CountMinCacheNoKeys, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
else if (args[3].contains("NoKeyRepBit"))
runTrialsOnSketch(SummaryStructureType.CountMinCacheNoKeysReportedBit, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
runTrialsPerThreshold(SummaryStructureType.CountMinCacheNoKeysReportedBit, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
else if (args[3].contains("Keys"))
runTrialsOnSketch(SummaryStructureType.CountMinCacheWithKeys, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
runTrialsPerThreshold(SummaryStructureType.CountMinCacheWithKeys, lostPacketStream, threshold, tableSize[tableSize_index]*9, D, thr_totalPackets);
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions code/Packet.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
the key fields from the packet*/
public class Packet{
private long srcip;
private long dstip;
private String srcPort;
private String dstPort;
private String protocol;
//private long dstip;
//private String srcPort;
//private String dstPort;
//private String protocol;

public Packet(long srcip, long dstip, String srcPort, String dstPort, String protocol){
this.srcip = srcip;
this.dstip = dstip;
this.srcPort = new String(srcPort);
this.dstPort = new String(dstPort);
this.protocol = new String(protocol);
//this.dstip = dstip;
//this.srcPort = new String(srcPort);
//this.dstPort = new String(dstPort);
//this.protocol = new String(protocol);
}

public long getSrcIp(){
return srcip;
}

public long getDstIp(){
/*public long getDstIp(){
return dstip;
}
}*/

public String fivetuple(){
return Long.toString(srcip); /* + long.toString(dstip) + srcPort + dstPort + protocol;*/
Expand Down
Loading

0 comments on commit baa02a3

Please sign in to comment.