-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcfadmin
executable file
·108 lines (102 loc) · 3.36 KB
/
cfadmin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python
# Author: Chris Moyer
#
# cfadmin is similar to sdbadmin for CloudFront, it's a simple
# console utility to perform the most frequent tasks with CloudFront
#
def _print_distributions(dists):
"""Internal function to print out all the distributions provided"""
print "%-12s %-50s %s" % ("Status", "Domain Name", "Origin")
print "-"*80
for d in dists:
print "%-12s %-50s %-30s" % (d.status, d.domain_name, d.origin)
for cname in d.cnames:
print " "*12, "CNAME => %s" % cname
print ""
def help(cf, fnc=None):
"""Print help message, optionally about a specific function"""
import inspect
self = sys.modules['__main__']
if fnc:
try:
cmd = getattr(self, fnc)
except:
cmd = None
if not inspect.isfunction(cmd):
print "No function named: %s found" % fnc
sys.exit(2)
(args, varargs, varkw, defaults) = inspect.getargspec(cmd)
print cmd.__doc__
print "Usage: %s %s" % (fnc, " ".join([ "[%s]" % a for a in args[1:]]))
else:
print "Usage: cfadmin [command]"
for cname in dir(self):
if not cname.startswith("_"):
cmd = getattr(self, cname)
if inspect.isfunction(cmd):
doc = cmd.__doc__
print "\t%s - %s" % (cname, doc)
sys.exit(1)
def ls(cf):
"""List all distributions and streaming distributions"""
print "Standard Distributions"
_print_distributions(cf.get_all_distributions())
print "Streaming Distributions"
_print_distributions(cf.get_all_streaming_distributions())
def invalidate(cf, origin_or_id, *paths):
"""Create a cloudfront invalidation request"""
# Allow paths to be passed using stdin
if not paths:
paths = []
for path in sys.stdin.readlines():
path = path.strip()
if path:
paths.append(path)
dist = None
for d in cf.get_all_distributions():
if d.id == origin_or_id or d.origin.dns_name == origin_or_id:
dist = d
break
if not dist:
print "Distribution not found: %s" % origin_or_id
sys.exit(1)
cf.create_invalidation_request(dist.id, paths)
def listinvalidations(cf, origin_or_id):
"""List invalidation requests for a given origin"""
dist = None
for d in cf.get_all_distributions():
if d.id == origin_or_id or d.origin.dns_name == origin_or_id:
dist = d
break
if not dist:
print "Distribution not found: %s" % origin_or_id
sys.exit(1)
results = cf.get_invalidation_requests(dist.id)
if results:
for result in results:
if result.status == "InProgress":
result = result.get_invalidation_request()
print result.id, result.status, result.paths
else:
print result.id, result.status
if __name__ == "__main__":
import boto
import sys
cf = boto.connect_cloudfront()
self = sys.modules['__main__']
if len(sys.argv) >= 2:
try:
cmd = getattr(self, sys.argv[1])
except:
cmd = None
args = sys.argv[2:]
else:
cmd = help
args = []
if not cmd:
cmd = help
try:
cmd(cf, *args)
except TypeError, e:
print e
help(cf, cmd.__name__)