-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathimagenet_to_gcs.py
470 lines (373 loc) · 16.1 KB
/
imagenet_to_gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Script to process the Imagenet dataset and upload to gcs.
To run the script setup a virtualenv with the following libraries installed.
- `gcloud`: Follow the instructions on
[cloud SDK docs](https://cloud.google.com/sdk/downloads) followed by
installing the python api using `pip install gcloud`.
- `google-cloud-storage`: Install with `pip install google-cloud-storage`
- `tensorflow`: Install with `pip install tensorflow`
Once you have all the above libraries setup, you should register on the
[Imagenet website](http://image-net.org/download-images) and download the
ImageNet .tar files. It should be extracted and provided in the format:
- Training images: train/n03062245/n03062245_4620.JPEG
- Validation Images: validation/ILSVRC2012_val_00000001.JPEG
- Validation Labels: synset_labels.txt
To run the script to preprocess the raw dataset as TFRecords and upload to gcs,
run the following command:
```
python3 imagenet_to_gcs.py \
--project="TEST_PROJECT" \
--gcs_output_path="gs://TEST_BUCKET/IMAGENET_DIR" \
--raw_data_dir="path/to/imagenet"
```
"""
import math
import os
import random
from typing import Iterable, List, Mapping, Union, Tuple
from absl import app
from absl import flags
from absl import logging
import tensorflow.compat.v1 as tf
from google.cloud import storage
flags.DEFINE_string(
'project', None, 'Google cloud project id for uploading the dataset.')
flags.DEFINE_string(
'gcs_output_path', None, 'GCS path for uploading the dataset.')
flags.DEFINE_string(
'local_scratch_dir', None, 'Scratch directory path for temporary files.')
flags.DEFINE_string(
'raw_data_dir', None, 'Directory path for raw Imagenet dataset. '
'Should have train and validation subdirectories inside it.')
flags.DEFINE_boolean(
'gcs_upload', True, 'Set to false to not upload to gcs.')
FLAGS = flags.FLAGS
LABELS_FILE = 'synset_labels.txt'
TRAINING_SHARDS = 1024
VALIDATION_SHARDS = 128
TRAINING_DIRECTORY = 'train'
VALIDATION_DIRECTORY = 'validation'
def _check_or_create_dir(directory: str):
"""Checks if directory exists otherwise creates it."""
if not tf.gfile.Exists(directory):
tf.gfile.MakeDirs(directory)
def _int64_feature(value: Union[int, Iterable[int]]) -> tf.train.Feature:
"""Inserts int64 features into Example proto."""
if not isinstance(value, list):
value = [value]
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
def _bytes_feature(value: Union[bytes, str]) -> tf.train.Feature:
"""Inserts bytes features into Example proto."""
if isinstance(value, str):
value = bytes(value, 'utf-8')
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _convert_to_example(filename: str,
image_buffer: str,
label: int,
synset: str,
height: int,
width: int) -> tf.train.Example:
"""Builds an Example proto for an ImageNet example.
Args:
filename: string, path to an image file, e.g., '/path/to/example.JPG'
image_buffer: string, JPEG encoding of RGB image
label: integer, identifier for the ground truth for the network
synset: string, unique WordNet ID specifying the label, e.g., 'n02323233'
height: integer, image height in pixels
width: integer, image width in pixels
Returns:
Example proto
"""
colorspace = 'RGB'
channels = 3
image_format = 'JPEG'
example = tf.train.Example(features=tf.train.Features(feature={
'image/height': _int64_feature(height),
'image/width': _int64_feature(width),
'image/colorspace': _bytes_feature(colorspace),
'image/channels': _int64_feature(channels),
'image/class/label': _int64_feature(label),
'image/class/synset': _bytes_feature(synset),
'image/format': _bytes_feature(image_format),
'image/filename': _bytes_feature(os.path.basename(filename)),
'image/encoded': _bytes_feature(image_buffer)}))
return example
def _is_png(filename: str) -> bool:
"""Determines if a file contains a PNG format image.
Args:
filename: string, path of the image file.
Returns:
boolean indicating if the image is a PNG.
"""
# File list from:
# https://github.com/cytsai/ilsvrc-cmyk-image-list
return 'n02105855_2933.JPEG' in filename
def _is_cmyk(filename: str) -> bool:
"""Determines if file contains a CMYK JPEG format image.
Args:
filename: string, path of the image file.
Returns:
boolean indicating if the image is a JPEG encoded with CMYK color space.
"""
# File list from:
# https://github.com/cytsai/ilsvrc-cmyk-image-list
denylist = set(['n01739381_1309.JPEG', 'n02077923_14822.JPEG',
'n02447366_23489.JPEG', 'n02492035_15739.JPEG',
'n02747177_10752.JPEG', 'n03018349_4028.JPEG',
'n03062245_4620.JPEG', 'n03347037_9675.JPEG',
'n03467068_12171.JPEG', 'n03529860_11437.JPEG',
'n03544143_17228.JPEG', 'n03633091_5218.JPEG',
'n03710637_5125.JPEG', 'n03961711_5286.JPEG',
'n04033995_2932.JPEG', 'n04258138_17003.JPEG',
'n04264628_27969.JPEG', 'n04336792_7448.JPEG',
'n04371774_5854.JPEG', 'n04596742_4225.JPEG',
'n07583066_647.JPEG', 'n13037406_4650.JPEG'])
return os.path.basename(filename) in denylist
class ImageCoder(object):
"""Helper class that provides TensorFlow image coding utilities."""
def __init__(self):
# Create a single Session to run all image coding calls.
self._sess = tf.Session()
# Initializes function that converts PNG to JPEG data.
self._png_data = tf.placeholder(dtype=tf.string)
image = tf.image.decode_png(self._png_data, channels=3)
self._png_to_jpeg = tf.image.encode_jpeg(image, format='rgb', quality=100)
# Initializes function that converts CMYK JPEG data to RGB JPEG data.
self._cmyk_data = tf.placeholder(dtype=tf.string)
image = tf.image.decode_jpeg(self._cmyk_data, channels=0)
self._cmyk_to_rgb = tf.image.encode_jpeg(image, format='rgb', quality=100)
# Initializes function that decodes RGB JPEG data.
self._decode_jpeg_data = tf.placeholder(dtype=tf.string)
self._decode_jpeg = tf.image.decode_jpeg(self._decode_jpeg_data, channels=3)
def png_to_jpeg(self, image_data: bytes) -> tf.Tensor:
"""Converts a PNG compressed image to a JPEG Tensor."""
return self._sess.run(self._png_to_jpeg,
feed_dict={self._png_data: image_data})
def cmyk_to_rgb(self, image_data: bytes) -> tf.Tensor:
"""Converts a CMYK image to RGB Tensor."""
return self._sess.run(self._cmyk_to_rgb,
feed_dict={self._cmyk_data: image_data})
def decode_jpeg(self, image_data: bytes) -> tf.Tensor:
"""Decodes a JPEG image."""
image = self._sess.run(self._decode_jpeg,
feed_dict={self._decode_jpeg_data: image_data})
assert len(image.shape) == 3
assert image.shape[2] == 3
return image
def _process_image(
filename: str, coder: ImageCoder) -> Tuple[str, int, int]:
"""Processes a single image file.
Args:
filename: string, path to an image file e.g., '/path/to/example.JPG'.
coder: instance of ImageCoder to provide TensorFlow image coding utils.
Returns:
image_buffer: string, JPEG encoding of RGB image.
height: integer, image height in pixels.
width: integer, image width in pixels.
"""
# Read the image file.
with tf.gfile.FastGFile(filename, 'rb') as f:
image_data = f.read()
# Clean the dirty data.
if _is_png(filename):
# 1 image is a PNG.
logging.info('Converting PNG to JPEG for %s', filename)
image_data = coder.png_to_jpeg(image_data)
elif _is_cmyk(filename):
# 22 JPEG images are in CMYK colorspace.
logging.info('Converting CMYK to RGB for %s', filename)
image_data = coder.cmyk_to_rgb(image_data)
# Decode the RGB JPEG.
image = coder.decode_jpeg(image_data)
# Check that image converted to RGB
assert len(image.shape) == 3
height = image.shape[0]
width = image.shape[1]
assert image.shape[2] == 3
return image_data, height, width
def _process_image_files_batch(
coder: ImageCoder,
output_file: str,
filenames: Iterable[str],
synsets: Iterable[Union[str, bytes]],
labels: Mapping[str, int]):
"""Processes and saves a list of images as TFRecords.
Args:
coder: instance of ImageCoder to provide TensorFlow image coding utils.
output_file: string, unique identifier specifying the data set.
filenames: list of strings; each string is a path to an image file.
synsets: list of strings; each string is a unique WordNet ID.
labels: map of string to integer; id for all synset labels.
"""
writer = tf.python_io.TFRecordWriter(output_file)
for filename, synset in zip(filenames, synsets):
image_buffer, height, width = _process_image(filename, coder)
label = labels[synset]
example = _convert_to_example(filename, image_buffer, label,
synset, height, width)
writer.write(example.SerializeToString())
writer.close()
def _process_dataset(
filenames: Iterable[str],
synsets: Iterable[str],
labels: Mapping[str, int],
output_directory: str,
prefix: str,
num_shards: int) -> List[str]:
"""Processes and saves list of images as TFRecords.
Args:
filenames: iterable of strings; each string is a path to an image file.
synsets: iterable of strings; each string is a unique WordNet ID.
labels: map of string to integer; id for all synset labels.
output_directory: path where output files should be created.
prefix: string; prefix for each file.
num_shards: number of chunks to split the filenames into.
Returns:
files: list of tf-record filepaths created from processing the dataset.
"""
_check_or_create_dir(output_directory)
chunksize = int(math.ceil(len(filenames) / num_shards))
coder = ImageCoder()
files = []
for shard in range(num_shards):
chunk_files = filenames[shard * chunksize : (shard + 1) * chunksize]
chunk_synsets = synsets[shard * chunksize : (shard + 1) * chunksize]
output_file = os.path.join(
output_directory, '%s-%.5d-of-%.5d' % (prefix, shard, num_shards))
_process_image_files_batch(coder, output_file, chunk_files,
chunk_synsets, labels)
logging.info('Finished writing file: %s', output_file)
files.append(output_file)
return files
def convert_to_tf_records(
raw_data_dir: str,
local_scratch_dir: str) -> Tuple[List[str], List[str]]:
"""Converts the Imagenet dataset into TF-Record dumps."""
# Shuffle training records to ensure we are distributing classes
# across the batches.
random.seed(0)
def make_shuffle_idx(n):
order = list(range(n))
random.shuffle(order)
return order
# Glob all the training files
training_files = tf.gfile.Glob(
os.path.join(raw_data_dir, TRAINING_DIRECTORY, '*', '*.JPEG'))
# Get training file synset labels from the directory name
training_synsets = [
os.path.basename(os.path.dirname(f)) for f in training_files]
training_synsets = list(map(lambda x: bytes(x, 'utf-8'), training_synsets))
training_shuffle_idx = make_shuffle_idx(len(training_files))
training_files = [training_files[i] for i in training_shuffle_idx]
training_synsets = [training_synsets[i] for i in training_shuffle_idx]
# Glob all the validation files
validation_files = sorted(tf.gfile.Glob(
os.path.join(raw_data_dir, VALIDATION_DIRECTORY, '*.JPEG')))
# Get validation file synset labels from labels.txt
validation_synsets = tf.gfile.FastGFile(
os.path.join(raw_data_dir, LABELS_FILE), 'rb').read().splitlines()
# Create unique ids for all synsets
labels = {v: k + 1 for k, v in enumerate(
sorted(set(validation_synsets + training_synsets)))}
# Create training data
logging.info('Processing the training data.')
training_records = _process_dataset(
training_files, training_synsets, labels,
os.path.join(local_scratch_dir, TRAINING_DIRECTORY),
TRAINING_DIRECTORY, TRAINING_SHARDS)
# Create validation data
logging.info('Processing the validation data.')
validation_records = _process_dataset(
validation_files, validation_synsets, labels,
os.path.join(local_scratch_dir, VALIDATION_DIRECTORY),
VALIDATION_DIRECTORY, VALIDATION_SHARDS)
return training_records, validation_records
def upload_to_gcs(training_records: Iterable[str],
validation_records: Iterable[str],
gcs_output_path: str,
gcs_project: str,
client: storage.Client = None):
"""Uploads TF-Record files to GCS, at provided path."""
# Find the GCS bucket_name and key_prefix for dataset files
path_parts = gcs_output_path[5:].split('/', 1)
bucket_name = path_parts[0]
if len(path_parts) == 1:
key_prefix = ''
elif path_parts[1].endswith('/'):
key_prefix = path_parts[1]
else:
key_prefix = path_parts[1] + '/'
client = client if client else storage.Client(project=gcs_project)
bucket = client.get_bucket(bucket_name)
def _upload_files(filenames: Iterable[str]):
"""Uploads a list of files into a specifc subdirectory."""
for i, filename in enumerate(sorted(filenames)):
blob = bucket.blob(key_prefix + os.path.basename(filename))
blob.upload_from_filename(filename)
if not i % 20:
logging.info('Finished uploading file: %s', filename)
# Upload training dataset
logging.info('Uploading the training data.')
_upload_files(training_records)
# Upload validation dataset
logging.info('Uploading the validation data.')
_upload_files(validation_records)
def run(raw_data_dir: str,
gcs_upload: bool,
gcs_project: str,
gcs_output_path: str,
local_scratch_dir: str,
client: storage.Client = None):
"""Runs the ImageNet preprocessing and uploading to GCS.
Args:
raw_data_dir: str, the path to the folder with raw ImageNet data.
gcs_upload: bool, whether or not to upload to GCS.
gcs_project: str, the GCS project to upload to.
gcs_output_path: str, the GCS bucket to write to.
local_scratch_dir: str, the local directory path.
client: An optional storage client.
"""
if gcs_upload and gcs_project is None:
raise ValueError('GCS Project must be provided.')
if gcs_upload and gcs_output_path is None:
raise ValueError('GCS output path must be provided.')
elif gcs_upload and not gcs_output_path.startswith('gs://'):
raise ValueError('GCS output path must start with gs://')
if raw_data_dir is None:
raise AssertionError(
'The ImageNet download path is no longer supported. Please download '
'and extract the .tar files manually and provide the `raw_data_dir`.')
# Convert the raw data into tf-records
training_records, validation_records = convert_to_tf_records(
raw_data_dir=raw_data_dir,
local_scratch_dir=local_scratch_dir)
# Upload to GCS
if gcs_upload:
upload_to_gcs(training_records=training_records,
validation_records=validation_records,
gcs_output_path=gcs_output_path,
gcs_project=gcs_project,
client=client)
def main(_):
run(raw_data_dir=FLAGS.raw_data_dir,
gcs_upload=FLAGS.gcs_upload,
gcs_project=FLAGS.project,
gcs_output_path=FLAGS.gcs_output_path,
local_scratch_dir=FLAGS.local_scratch_dir)
if __name__ == '__main__':
logging.set_verbosity(logging.INFO)
tf.disable_v2_behavior()
app.run(main)