Skip to content

Commit

Permalink
optimize generation of rest responses and reduce copying bytes around
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed May 24, 2011
1 parent d44a796 commit 68a56a0
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 46 deletions.
1 change: 1 addition & 0 deletions .idea/projectCodeStyle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -171,7 +171,7 @@ int querySourceLength() {
* @see org.elasticsearch.index.query.xcontent.QueryBuilders
*/
@Required public CountRequest query(QueryBuilder queryBuilder) {
FastByteArrayOutputStream bos = queryBuilder.buildAsUnsafeBytes();
BytesStream bos = queryBuilder.buildAsUnsafeBytes();
this.querySource = bos.unsafeByteArray();
this.querySourceOffset = 0;
this.querySourceLength = bos.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -123,7 +123,7 @@ byte[] querySource() {
* @see org.elasticsearch.index.query.xcontent.QueryBuilders
*/
@Required public DeleteByQueryRequest query(QueryBuilder queryBuilder) {
FastByteArrayOutputStream bos = queryBuilder.buildAsUnsafeBytes();
BytesStream bos = queryBuilder.buildAsUnsafeBytes();
this.querySource = bos.unsafeByteArray();
this.querySourceOffset = 0;
this.querySourceLength = bos.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -323,7 +323,7 @@ void beforeLocalFork() {
* more like this documents.
*/
public MoreLikeThisRequest searchSource(SearchSourceBuilder sourceBuilder) {
FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
BytesStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
this.searchSource = bos.unsafeByteArray();
this.searchSourceOffset = 0;
this.searchSourceLength = bos.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -256,7 +256,7 @@ public SearchRequest searchType(String searchType) throws ElasticSearchIllegalAr
* The source of the search request.
*/
public SearchRequest source(SearchSourceBuilder sourceBuilder) {
FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
BytesStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
this.source = bos.unsafeByteArray();
this.sourceOffset = 0;
this.sourceLength = bos.size();
Expand Down Expand Up @@ -347,7 +347,7 @@ public int sourceLength() {
* Allows to provide additional source that will be used as well.
*/
public SearchRequest extraSource(SearchSourceBuilder sourceBuilder) {
FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
BytesStream bos = sourceBuilder.buildAsUnsafeBytes(Requests.CONTENT_TYPE);
this.extraSource = bos.unsafeByteArray();
this.extraSourceOffset = 0;
this.extraSourceLength = bos.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.io;

public interface BytesStream {

byte[] unsafeByteArray();

int size();

byte[] copiedByteArray();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*
* @author kimchy (Shay Banon)
*/
public class FastByteArrayOutputStream extends OutputStream {
public class FastByteArrayOutputStream extends OutputStream implements BytesStream {

/**
* A thread local based cache of {@link FastByteArrayOutputStream}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.io.BytesStream;

import java.io.IOException;
import java.util.Arrays;

/**
* @author kimchy (shay.banon)
*/
public class BytesStreamOutput extends StreamOutput {
public class BytesStreamOutput extends StreamOutput implements BytesStream {

/**
* The buffer where data is stored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.joda.time.ReadableInstant;
Expand Down Expand Up @@ -63,14 +65,6 @@ public static void globalFieldCaseConversion(FieldCaseConversion globalFieldCase
XContentBuilder.globalFieldCaseConversion = globalFieldCaseConversion;
}

private XContentGenerator generator;

private final OutputStream bos;

private FieldCaseConversion fieldCaseConversion = globalFieldCaseConversion;

private StringBuilder cachedStringBuilder;

/**
* Constructs a new cached builder over a cached (thread local) {@link FastByteArrayOutputStream}.
*/
Expand All @@ -85,13 +79,34 @@ public static XContentBuilder builder(XContent xContent) throws IOException {
return new XContentBuilder(xContent, new FastByteArrayOutputStream());
}


private XContentGenerator generator;

private final OutputStream bos;

private final Object payload;

private FieldCaseConversion fieldCaseConversion = globalFieldCaseConversion;

private StringBuilder cachedStringBuilder;


/**
* Constructs a new builder using the provided xcontent and an OutputStream. Make sure
* to call {@link #close()} when the builder is done with.
*/
public XContentBuilder(XContent xContent, OutputStream bos) throws IOException {
this(xContent, bos, null);
}

/**
* Constructs a new builder using the provided xcontent and an OutputStream. Make sure
* to call {@link #close()} when the builder is done with.
*/
public XContentBuilder(XContent xContent, OutputStream bos, @Nullable Object payload) throws IOException {
this.bos = bos;
this.generator = xContent.createGenerator(bos);
this.payload = payload;
}

public XContentBuilder fieldCaseConversion(FieldCaseConversion fieldCaseConversion) {
Expand Down Expand Up @@ -919,6 +934,14 @@ public void close() {
}
}

@Nullable public Object payload() {
return this.payload;
}

public OutputStream stream() {
return this.bos;
}

/**
* Returns the unsafe bytes (thread local bound). Make sure to use it with
* {@link #unsafeBytesLength()}.
Expand All @@ -927,7 +950,7 @@ public void close() {
*/
public byte[] unsafeBytes() throws IOException {
close();
return ((FastByteArrayOutputStream) bos).unsafeByteArray();
return ((BytesStream) bos).unsafeByteArray();
}

/**
Expand All @@ -938,15 +961,15 @@ public byte[] unsafeBytes() throws IOException {
*/
public int unsafeBytesLength() throws IOException {
close();
return ((FastByteArrayOutputStream) bos).size();
return ((BytesStream) bos).size();
}

/**
* Returns the actual stream used.
*/
public FastByteArrayOutputStream unsafeStream() throws IOException {
public BytesStream unsafeStream() throws IOException {
close();
return (FastByteArrayOutputStream) bos;
return (BytesStream) bos;
}

/**
Expand All @@ -956,7 +979,7 @@ public FastByteArrayOutputStream unsafeStream() throws IOException {
*/
public byte[] copiedBytes() throws IOException {
close();
return ((FastByteArrayOutputStream) bos).copiedByteArray();
return ((BytesStream) bos).copiedByteArray();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@

package org.elasticsearch.http.netty;

import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.buffer.ChannelBuffers;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelFuture;
import org.elasticsearch.common.netty.channel.ChannelFutureListener;
import org.elasticsearch.common.netty.handler.codec.http.*;
import org.elasticsearch.common.netty.handler.codec.http.Cookie;
import org.elasticsearch.common.netty.handler.codec.http.CookieDecoder;
import org.elasticsearch.common.netty.handler.codec.http.CookieEncoder;
import org.elasticsearch.common.netty.handler.codec.http.DefaultHttpResponse;
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.netty.handler.codec.http.HttpResponseStatus;
import org.elasticsearch.common.netty.handler.codec.http.HttpVersion;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpException;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.transport.netty.NettyTransport;

import java.io.IOException;
import java.util.Set;
Expand Down Expand Up @@ -74,12 +85,27 @@ public NettyHttpChannel(Channel channel, org.elasticsearch.common.netty.handler.
}

// Convert the response content to a ChannelBuffer.
ChannelFutureListener releaseContentListener = null;
ChannelBuffer buf;
try {
if (response.contentThreadSafe()) {
buf = ChannelBuffers.wrappedBuffer(response.content(), 0, response.contentLength());
if (response instanceof XContentRestResponse) {
// if its a builder based response, and it was created with a CachedStreamOutput, we can release it
// after we write the response, and no need to do an extra copy because its not thread safe
XContentBuilder builder = ((XContentRestResponse) response).builder();
if (builder.payload() instanceof CachedStreamOutput.Entry) {
releaseContentListener = new NettyTransport.CacheFutureListener((CachedStreamOutput.Entry) builder.payload());
buf = ChannelBuffers.wrappedBuffer(builder.unsafeBytes(), 0, builder.unsafeBytesLength());
} else if (response.contentThreadSafe()) {
buf = ChannelBuffers.wrappedBuffer(response.content(), 0, response.contentLength());
} else {
buf = ChannelBuffers.copiedBuffer(response.content(), 0, response.contentLength());
}
} else {
buf = ChannelBuffers.copiedBuffer(response.content(), 0, response.contentLength());
if (response.contentThreadSafe()) {
buf = ChannelBuffers.wrappedBuffer(response.content(), 0, response.contentLength());
} else {
buf = ChannelBuffers.copiedBuffer(response.content(), 0, response.contentLength());
}
}
} catch (IOException e) {
throw new HttpException("Failed to convert response to bytes", e);
Expand Down Expand Up @@ -116,6 +142,9 @@ public NettyHttpChannel(Channel channel, org.elasticsearch.common.netty.handler.

// Write the response.
ChannelFuture future = channel.write(resp);
if (releaseContentListener != null) {
future.addListener(releaseContentListener);
}

// Close the connection after the write operation is done if necessary.
if (close) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -48,7 +48,12 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -194,7 +199,7 @@ public void addQuery(String name, QueryBuilder queryBuilder) throws ElasticSearc
try {
XContentBuilder builder = XContentFactory.smileBuilder()
.startObject().field("query", queryBuilder).endObject();
FastByteArrayOutputStream unsafeBytes = builder.unsafeStream();
BytesStream unsafeBytes = builder.unsafeStream();
addQuery(name, unsafeBytes.unsafeByteArray(), 0, unsafeBytes.size());
} catch (IOException e) {
throw new ElasticSearchException("Failed to add query [" + name + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

package org.elasticsearch.index.query;

import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.xcontent.XContentType;

/**
* @author kimchy (shay.banon)
*/
public interface QueryBuilder {

FastByteArrayOutputStream buildAsUnsafeBytes() throws QueryBuilderException;
BytesStream buildAsUnsafeBytes() throws QueryBuilderException;

FastByteArrayOutputStream buildAsUnsafeBytes(XContentType contentType) throws QueryBuilderException;
BytesStream buildAsUnsafeBytes(XContentType contentType) throws QueryBuilderException;

byte[] buildAsBytes() throws QueryBuilderException;

Expand Down
Loading

0 comments on commit 68a56a0

Please sign in to comment.