Skip to content

Commit

Permalink
[#11186] Change gRPC UNARY response based on header value
Browse files Browse the repository at this point in the history
  • Loading branch information
donghun-cho authored and feelform committed Jul 3, 2024
1 parent 887ec53 commit 2226a97
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.navercorp.pinpoint.grpc.client.config.ClientOption;
import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption;
import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder;
import com.navercorp.pinpoint.grpc.client.retry.RetryHeaderFactory;
import com.navercorp.pinpoint.io.ResponseMessage;
import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig;
import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender;
Expand Down Expand Up @@ -109,7 +110,7 @@ protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, bool
final String factoryName = getChannelFactoryName(clientRetryEnable);

ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder(factoryName);
channelFactoryBuilder.setHeaderFactory(headerFactory);
channelFactoryBuilder.setHeaderFactory(getHeaderFactory(clientRetryEnable));
channelFactoryBuilder.setNameResolverProvider(nameResolverProvider);
channelFactoryBuilder.addClientInterceptor(unaryCallDeadlineInterceptor);
if (clientInterceptorList != null) {
Expand Down Expand Up @@ -147,4 +148,11 @@ private String getChannelFactoryName(boolean clientRetryEnable) {
}
return MetadataGrpcDataSender.class.getSimpleName();
}

private HeaderFactory getHeaderFactory(boolean clientRetryEnable) {
if (clientRetryEnable) {
return new RetryHeaderFactory(headerFactory);
}
return headerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public LogResponseStreamObserver(Logger logger, String name, long requestId) {
@Override
public void onNext(ResT response) {
if (logger.isDebugEnabled()) {
logger.debug("{} Request success. result={}", logString(response));
logger.debug("{} Request success. result={}", name, logString(response));
}
}

Expand All @@ -64,7 +64,7 @@ public void onError(Throwable throwable) {
@Override
public void onCompleted() {
if (logger.isDebugEnabled()) {
logger.debug("{} onCompleted. requestCount={}", requestId, name);
logger.debug("{} onCompleted. requestId={}", name, requestId);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.receiver.grpc.retry;

import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.io.request.ServerResponse;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.util.Objects;

public class GrpcRetryFriendlyServerResponse<T> implements ServerResponse<T> {
private final StreamObserver<T> responseObserver;

public GrpcRetryFriendlyServerResponse(StreamObserver<T> responseObserver) {
this.responseObserver = Objects.requireNonNull(responseObserver, "responseObserver");
}

@Override
public void write(final T message) {
Objects.requireNonNull(message, "message");
if (message instanceof final PResult pResult) {
if (!pResult.getSuccess()) {
responseObserver.onError(Status.UNAVAILABLE.withDescription(pResult.getMessage()).asRuntimeException());
return;
}
}

responseObserver.onNext(message);
responseObserver.onCompleted();
}

@Override
public void finish() {
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcServerResponse;
import com.navercorp.pinpoint.collector.receiver.grpc.retry.GrpcRetryFriendlyServerResponse;
import com.navercorp.pinpoint.grpc.server.ServerContext;
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.ServerResponse;
Expand Down Expand Up @@ -49,7 +51,7 @@ public SimpleRequestHandlerAdaptor(String name, DispatchHandler<REQ, RES> dispat
public void request(Message<? extends REQ> message, StreamObserver<? extends RES> responseObserver) {
try {
final ServerRequest<? extends REQ> request = serverRequestFactory.newServerRequest(message);
final ServerResponse<? extends RES> response = new GrpcServerResponse<>(responseObserver);
final ServerResponse<? extends RES> response = newServerResponse(responseObserver);
this.dispatchHandler.dispatchRequestMessage((ServerRequest<REQ>) request, (ServerResponse<RES>) response);
} catch (Exception e) {
logger.warn("Failed to request. message={}", message, e);
Expand All @@ -61,4 +63,11 @@ public void request(Message<? extends REQ> message, StreamObserver<? extends RES
}
}
}

private ServerResponse<? extends RES> newServerResponse(StreamObserver<? extends RES> responseObserver) {
if (ServerContext.getAgentInfo().isRetryFriendlyResponse()) {
return new GrpcRetryFriendlyServerResponse<>(responseObserver);
}
return new GrpcServerResponse<>(responseObserver);
}
}
15 changes: 14 additions & 1 deletion grpc/src/main/java/com/navercorp/pinpoint/grpc/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class Header {
public static final Metadata.Key<String> SOCKET_ID = newStringKey("socketid");
public static final Metadata.Key<String> SERVICE_TYPE_KEY = newStringKey("servicetype");
public static final Metadata.Key<String> SUPPORT_COMMAND_CODE = newStringKey("supportCommandCode");
public static final Metadata.Key<String> RETRY_FRIENDLY_RESPONSE = newStringKey("retry-friendly-response");

public static final String SUPPORT_COMMAND_CODE_DELIMITER = ";";

Expand All @@ -49,6 +50,7 @@ private static Metadata.Key<String> newStringKey(String s) {

public static final List<Integer> SUPPORT_COMMAND_CODE_LIST_NOT_EXIST = null;
public static final List<Integer> SUPPORT_COMMAND_CODE_LIST_PARSE_ERROR = Collections.emptyList();
public static final boolean DEFAULT_RETRY_FRIENDLY_RESPONSE = false;

private final String name;
private final String agentId;
Expand All @@ -58,21 +60,24 @@ private static Metadata.Key<String> newStringKey(String s) {
private final long socketId;
private final int serviceType;
private final List<Integer> supportCommandCodeList;
private final boolean retryFriendlyResponse;
private final Map<String, Object> properties;

public Header(String name, String agentId, String agentName, String applicationName,
int serviceType, long agentStartTime,
long socketId, List<Integer> supportCommandCodeList) {
this(name, agentId, agentName, applicationName,
serviceType, agentStartTime,
socketId, supportCommandCodeList, Collections.emptyMap());
socketId, supportCommandCodeList,
DEFAULT_RETRY_FRIENDLY_RESPONSE, Collections.emptyMap());
}

public Header(String name,
String agentId, String agentName, String applicationName,
int serviceType,
long agentStartTime, long socketId,
List<Integer> supportCommandCodeList,
boolean retryFriendlyResponse,
final Map<String, Object> properties) {
this.name = Objects.requireNonNull(name, "name");
this.agentId = Objects.requireNonNull(agentId, "agentId");
Expand All @@ -83,6 +88,7 @@ public Header(String name,
// allow null
this.agentName = agentName;
this.supportCommandCodeList = supportCommandCodeList;
this.retryFriendlyResponse = retryFriendlyResponse;
this.properties = Objects.requireNonNull(properties, "properties");
}

Expand Down Expand Up @@ -114,6 +120,10 @@ public List<Integer> getSupportCommandCodeList() {
return supportCommandCodeList;
}

public boolean isRetryFriendlyResponse() {
return retryFriendlyResponse;
}

public Object get(String key) {
return properties.get(key);
}
Expand All @@ -133,6 +143,7 @@ public String toString() {
", socketId=" + socketId +
", serviceType=" + serviceType +
", supportCommandCodeList=" + supportCommandCodeList +
", retryFriendlyResponse='" + retryFriendlyResponse + '\'' +
", properties=" + properties +
'}';
}
Expand All @@ -151,6 +162,7 @@ public boolean equals(Object o) {
if (agentId != null ? !agentId.equals(header.agentId) : header.agentId != null) return false;
if (applicationName != null ? !applicationName.equals(header.applicationName) : header.applicationName != null)
return false;
if (retryFriendlyResponse != header.retryFriendlyResponse) return false;
if (supportCommandCodeList != null ? !supportCommandCodeList.equals(header.supportCommandCodeList) : header.supportCommandCodeList != null)
return false;
return properties != null ? properties.equals(header.properties) : header.properties == null;
Expand All @@ -165,6 +177,7 @@ public int hashCode() {
result = 31 * result + (int) (socketId ^ (socketId >>> 32));
result = 31 * result + serviceType;
result = 31 * result + (supportCommandCodeList != null ? supportCommandCodeList.hashCode() : 0);
result = 31 * result + (retryFriendlyResponse ? 1 : 0);
result = 31 * result + (properties != null ? properties.hashCode() : 0);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.navercorp.pinpoint.grpc.client.retry;

import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.client.HeaderFactory;
import io.grpc.Metadata;

import java.util.Objects;

public class RetryHeaderFactory implements HeaderFactory {

private final HeaderFactory headerFactory;

public RetryHeaderFactory(HeaderFactory headerFactory) {
this.headerFactory = Objects.requireNonNull(headerFactory, "headerFactory");
}

@Override
public Metadata newHeader() {
Metadata metadata = headerFactory.newHeader();
metadata.put(Header.RETRY_FRIENDLY_RESPONSE, Boolean.TRUE.toString());
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public Header extract(Metadata headers) {
final int serviceType = getServiceType(headers);
final long socketId = getSocketId(headers);
final List<Integer> supportCommandCodeList = getSupportCommandCodeList(headers);
final boolean retryFriendlyResponse = getRetryFriendlyResponse(headers);
final Map<String, Object> properties = metadataConverter.apply(headers);
return new Header(name, agentId, agentName, applicationName, serviceType, startTime, socketId, supportCommandCodeList, properties);
return new Header(name, agentId, agentName, applicationName, serviceType, startTime, socketId, supportCommandCodeList, retryFriendlyResponse, properties);
}

public static Map<String, Object> emptyProperties(Metadata headers) {
Expand Down Expand Up @@ -141,6 +142,14 @@ protected List<Integer> getSupportCommandCodeList(Metadata headers) {
}
}

protected boolean getRetryFriendlyResponse(Metadata headers) {
final String value = headers.get(Header.RETRY_FRIENDLY_RESPONSE);
if (value != null) {
return Boolean.parseBoolean(value);
}
return Header.DEFAULT_RETRY_FRIENDLY_RESPONSE;
}

String validateId(String id, Metadata.Key key) {
if (!IdValidateUtils.validateId(id)) {
throw Status.INVALID_ARGUMENT.withDescription("invalid " + key.name()).asRuntimeException();
Expand Down

0 comments on commit 2226a97

Please sign in to comment.