Skip to content

Commit

Permalink
Various fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
dbishop committed Feb 9, 2012
1 parent 8bb8cf6 commit 1afb13e
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 21 deletions.
Binary file added debian/.changelog.swp
Binary file not shown.
3 changes: 2 additions & 1 deletion debian/changelog
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
python-ss-statsd (1.7-3ss2) oneiric; urgency=low
python-ss-statsd (1.7-3ss6) oneiric; urgency=low

* Added "queue" writer.
* Removed signal handling code, since we run the Server in a thread.

-- SwiftStack Inc. <contact@swiftstack.net> Wed, 08 Feb 2012 17:18:00 -0800

Expand Down
Binary file added pystatsd/.server.py.swp
Binary file not shown.
Binary file added pystatsd/.statsd.py.swp
Binary file not shown.
40 changes: 20 additions & 20 deletions pystatsd/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite',
self.buf = 8192
self.flush_interval = flush_interval
self.pct_threshold = pct_threshold
self.no_aggregate_counters = no_aggregate_counters

if transport == 'ganglia':
self.transport = TransportGanglia(
Expand All @@ -73,6 +74,7 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite',
dmax=int(self.flush_interval * 1.2),
# What hostname should these metrics be attached to.
ganglia_spoof_host=ganglia_spoof_host,
pct_threshold = pct_threshold,
)
elif transport == 'graphite':
self.transport = TransportGraphite(
Expand All @@ -81,12 +83,14 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite',
graphite_port=graphite_port,
counters_prefix=counters_prefix,
timers_prefix=timers_prefix,
pct_threshold = pct_threshold,
)
elif transport == 'graphite_queue':
self.transport = TransportGraphiteQueue(
queue=queue,
counters_prefix=counters_prefix,
timers_prefix=timers_prefix,
pct_threshold = pct_threshold,
)
else:
self.transport = TransportNop()
Expand All @@ -102,7 +106,7 @@ def process(self, data):

del bits[0]
if len(bits) == 0:
bits.append(0)
return

for bit in bits:
sample_rate = 1;
Expand Down Expand Up @@ -186,13 +190,6 @@ def serve(self, hostname='', port=8125):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(addr)

import signal
def signal_handler(signal, frame):
self.stop()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

self._set_timer()
while True:
data, addr = self._sock.recvfrom(self.buf)
Expand All @@ -215,12 +212,13 @@ def stop(self):
self._sock.close()

class TransportGanglia(object):
def __init__(self, host, port, protocol, dmax, spoof_host):
def __init__(self, host, port, protocol, dmax, spoof_host, pct_threshold):
self.host = host
self.port = port
self.protocol = protocol
self.dmax = dmax
self.spoof_host = spoof_host
self.pct_threshold = pct_threshold
self.g = None

def start_flush(self):
Expand All @@ -231,7 +229,7 @@ def flush_counter(self, k, v, ts):
# first in the GUI. Change below if you disagree
self.g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.spoof_host)

def flush_timer(self, min, mean, max, count, max_threshold, ts):
def flush_timer(self, k, min, mean, max, count, max_threshold, ts):
# What group should these metrics be in. For the time being we'll set it to the name of the key
group = k
self.g.send(k + "_lower", min, "double", "time", "both", 60, self.dmax, group, self.spoof_host)
Expand All @@ -249,13 +247,14 @@ def finish_flush(self):


class TransportGraphite(object):
def __init__(self, host, port, counters_prefix, timers_prefix):
def __init__(self, host, port, counters_prefix, timers_prefix, pct_threshold):
self.host = host
self.port = port
if counters_prefix:
counters_prefix = '%s.' % (counters_prefix,)
self.counters_prefix = counters_prefix
self.timers_prefix = timers_prefix
self.pct_threshold = pct_threshold
self.graphite_socket = None

def start_flush(self):
Expand All @@ -266,7 +265,7 @@ def flush_counter(self, k, v, ts):
self.stat_string += msg


def flush_timer(self, min, mean, max, count, max_threshold, ts):
def flush_timer(self, k, min, mean, max, count, max_threshold, ts):
self.stat_string += TIMER_MSG % {
'prefix':self.timers_prefix,
'key':k,
Expand Down Expand Up @@ -296,11 +295,12 @@ def finish_flush(self):


class TransportGraphiteQueue(object):
def __init__(self, queue, counters_prefix, timers_prefix):
def __init__(self, queue, counters_prefix, timers_prefix, pct_threshold):
self.queue = queue
if counters_prefix:
counters_prefix = '%s.' % (counters_prefix,)
self.counters_prefix = counters_prefix
self.pct_threshold = pct_threshold
self.timers_prefix = timers_prefix

def start_flush(self):
Expand All @@ -311,14 +311,14 @@ def flush_counter(self, k, v, ts):
('%s%s' % (self.counters_prefix, k), (ts, v))
])

def flush_timer(self, min_v, mean, max_v, count, max_threshold, ts):
def flush_timer(self, k, min_v, mean, max_v, count, max_threshold, ts):
upper_n = 'upper_%s' % (self.pct_threshold,)
self.queue.put([
('.'.join(self.timers_prefix, k, 'lower'), (ts, min_v)),
('.'.join(self.timers_prefix, k, 'count'), (ts, count)),
('.'.join(self.timers_prefix, k, 'mean'), (ts, mean)),
('.'.join(self.timers_prefix, k, 'upper'), (ts, max_v)),
('.'.join(self.timers_prefix, k, upper_n), (ts, max_threshold)),
('.'.join([self.timers_prefix, k, 'lower']), (ts, min_v)),
('.'.join([self.timers_prefix, k, 'count']), (ts, count)),
('.'.join([self.timers_prefix, k, 'mean']), (ts, mean)),
('.'.join([self.timers_prefix, k, 'upper']), (ts, max_v)),
('.'.join([self.timers_prefix, k, upper_n]), (ts, max_threshold)),
])

def flush_statsd_stats(self, stats, ts):
Expand All @@ -337,7 +337,7 @@ def start_flush(self):
def flush_counter(self, k, v, ts):
pass

def flush_timer(self, min, mean, max, count, max_threshold, ts):
def flush_timer(self, k, min, mean, max, count, max_threshold, ts):
pass

def flush_statsd_stats(self, stats, ts):
Expand Down

0 comments on commit 1afb13e

Please sign in to comment.