forked from django-compressor/django-compressor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.py
126 lines (99 loc) · 4.28 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from __future__ import unicode_literals
import errno
import gzip
import os
from datetime import datetime
import time
from django.core.files.storage import FileSystemStorage, get_storage_class
from django.utils.functional import LazyObject, SimpleLazyObject
from compressor.conf import settings
class CompressorFileStorage(FileSystemStorage):
"""
Standard file system storage for files handled by django-compressor.
The defaults for ``location`` and ``base_url`` are ``COMPRESS_ROOT`` and
``COMPRESS_URL``.
"""
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.COMPRESS_ROOT
if base_url is None:
base_url = settings.COMPRESS_URL
super(CompressorFileStorage, self).__init__(location, base_url,
*args, **kwargs)
def accessed_time(self, name):
return datetime.fromtimestamp(os.path.getatime(self.path(name)))
def created_time(self, name):
return datetime.fromtimestamp(os.path.getctime(self.path(name)))
def modified_time(self, name):
return datetime.fromtimestamp(os.path.getmtime(self.path(name)))
def get_available_name(self, name, max_length=None):
"""
Deletes the given file if it exists.
"""
if self.exists(name):
self.delete(name)
return name
def delete(self, name):
"""
Handle deletion race condition present in Django prior to 1.4
https://code.djangoproject.com/ticket/16108
"""
try:
super(CompressorFileStorage, self).delete(name)
except OSError as e:
if e.errno != errno.ENOENT:
raise
compressor_file_storage = SimpleLazyObject(
lambda: get_storage_class('compressor.storage.CompressorFileStorage')())
class GzipCompressorFileStorage(CompressorFileStorage):
"""
File system storage that stores gzipped files in addition to the usual files.
"""
def save(self, filename, content):
filename = super(GzipCompressorFileStorage, self).save(filename, content)
orig_path = self.path(filename)
compressed_path = '%s.gz' % orig_path
with open(orig_path, 'rb') as f_in, open(compressed_path, 'wb') as f_out:
with gzip.GzipFile(fileobj=f_out) as gz_out:
gz_out.write(f_in.read())
# Ensure the file timestamps match.
# os.stat() returns nanosecond resolution on Linux, but os.utime()
# only sets microsecond resolution. Set times on both files to
# ensure they are equal.
stamp = time.time()
os.utime(orig_path, (stamp, stamp))
os.utime(compressed_path, (stamp, stamp))
return filename
class BrotliCompressorFileStorage(CompressorFileStorage):
"""
File system storage that stores brotli files in addition to the usual files.
"""
chunk_size = 1024
def __init__(self, *args, **kwargs):
import brotli
self.Compressor = brotli.Compressor
super(BrotliCompressorFileStorage, self).__init__(*args, **kwargs)
def save(self, filename, content):
filename = super(BrotliCompressorFileStorage, self).save(filename, content)
orig_path = self.path(filename)
compressed_path = '%s.br' % orig_path
br_compressor = self.Compressor()
with open(orig_path, 'rb') as f_in, open(compressed_path, 'wb') as f_out:
for f_in_data in iter(lambda: f_in.read(self.chunk_size), b''):
compressed_data = br_compressor.compress(f_in_data)
if not compressed_data:
compressed_data = br_compressor.flush()
f_out.write(compressed_data)
f_out.write(br_compressor.finish())
# Ensure the file timestamps match.
# os.stat() returns nanosecond resolution on Linux, but os.utime()
# only sets microsecond resolution. Set times on both files to
# ensure they are equal.
stamp = time.time()
os.utime(orig_path, (stamp, stamp))
os.utime(compressed_path, (stamp, stamp))
return filename
class DefaultStorage(LazyObject):
def _setup(self):
self._wrapped = get_storage_class(settings.COMPRESS_STORAGE)()
default_storage = DefaultStorage()