Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] (nereids)implement CreateStageCommand and DropStageComm… #48478

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ supportedCreateStatement
functionIdentifier LEFT_PAREN functionArguments? RIGHT_PAREN
WITH PARAMETER LEFT_PAREN parameters=identifierSeq? RIGHT_PAREN
AS expression #createAliasFunction
| CREATE STAGE (IF NOT EXISTS)? name=identifier properties=propertyClause? #createStage
;

supportedAlterStatement
Expand Down Expand Up @@ -267,6 +268,7 @@ supportedDropStatement
| DROP statementScope? FUNCTION (IF EXISTS)?
functionIdentifier LEFT_PAREN functionArguments? RIGHT_PAREN #dropFunction
| DROP INDEX (IF EXISTS)? name=identifier ON tableName=multipartIdentifier #dropIndex
| DROP STAGE (IF EXISTS)? name=identifier #dropStage
;

supportedShowStatement
Expand Down Expand Up @@ -723,7 +725,6 @@ unsupportedDropStatement
| DROP ROW POLICY (IF EXISTS)? policyName=identifier
ON tableName=multipartIdentifier
(FOR (userIdentify | ROLE roleName=identifier))? #dropRowPolicy
| DROP STAGE (IF EXISTS)? name=identifier #dropStage
;

supportedStatsStatement
Expand Down Expand Up @@ -787,7 +788,6 @@ unsupportedCreateStatement
(CONDITIONS LEFT_PAREN workloadPolicyConditions RIGHT_PAREN)?
(ACTIONS LEFT_PAREN workloadPolicyActions RIGHT_PAREN)?
properties=propertyClause? #createWorkloadPolicy
| CREATE STAGE (IF NOT EXISTS)? name=identifier properties=propertyClause? #createStage
;

workloadPolicyActions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateStageCommand;
import org.apache.doris.nereids.trees.plans.commands.DropStageCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;
Expand Down Expand Up @@ -333,6 +335,15 @@ public void createStage(CreateStageStmt stmt) throws DdlException {
}
}

public void createStage(CreateStageCommand command) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
if (!command.isDryRun()) {
((CloudInternalCatalog) getInternalCatalog()).createStage(command.toStageProto(), command.isIfNotExists());
}
}

public void dropStage(DropStageStmt stmt) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
Expand All @@ -341,6 +352,14 @@ public void dropStage(DropStageStmt stmt) throws DdlException {
null, null, stmt.getStageName(), null, stmt.isIfExists());
}

public void dropStage(DropStageCommand command) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
((CloudInternalCatalog) getInternalCatalog()).dropStage(Cloud.StagePB.StageType.EXTERNAL,
null, null, command.getStageName(), null, command.isIfExists());
}

public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception {
int size = dis.readInt();
long newChecksum = checksum ^ size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRoleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateStageCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableLikeCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateViewCommand;
Expand All @@ -570,6 +571,7 @@
import org.apache.doris.nereids.trees.plans.commands.DropRepositoryCommand;
import org.apache.doris.nereids.trees.plans.commands.DropRoleCommand;
import org.apache.doris.nereids.trees.plans.commands.DropSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.DropStageCommand;
import org.apache.doris.nereids.trees.plans.commands.DropStoragePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.DropTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DropUserCommand;
Expand Down Expand Up @@ -6015,5 +6017,24 @@ public LogicalPlan visitDescribeTableValuedFunction(DorisParser.DescribeTableVal
}
return new DescribeCommand(tableValuedFunctionRef);
}

@Override
public LogicalPlan visitCreateStage(DorisParser.CreateStageContext ctx) {
String stageName = ctx.name.getText();
Map<String, String> properties = Maps.newHashMap(visitPropertyClause(ctx.properties));

CreateStageCommand command = new CreateStageCommand(
ctx.IF() != null,
stageName,
properties
);

return command;
}

@Override
public LogicalPlan visitDropStage(DorisParser.DropStageContext ctx) {
return new DropStageCommand(ctx.IF() != null, ctx.name.getText());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -305,5 +305,7 @@ public enum PlanType {
ALTER_SYSTEM_SET_LOAD_ERRORS_HU,
ALTER_SYSTEM_MODIFY_BACKEND,
ALTER_SYSTEM_MODIFY_FRONTEND_OR_BACKEND_HOSTNAME,
ALTER_SYSTEM_RENAME_COMPUTE_GROUP
ALTER_SYSTEM_RENAME_COMPUTE_GROUP,
CREATE_STAGE,
DROP_STAGE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.StageProperties;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud.GetIamResponse;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
import org.apache.doris.cloud.proto.Cloud.RamUserPB;
import org.apache.doris.cloud.proto.Cloud.StagePB;
import org.apache.doris.cloud.proto.Cloud.StagePB.StageAccessType;
import org.apache.doris.cloud.proto.Cloud.StagePB.StageType;
import org.apache.doris.cloud.storage.RemoteBase;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.Map;
import java.util.UUID;

/**
* CreateStageCommand
*/
public class CreateStageCommand extends Command implements ForwardWithSync, NeedAuditEncryption {
private static final Logger LOG = LogManager.getLogger(CreateStageCommand.class);
protected StagePB.StageType type;
private final boolean ifNotExists;
private final String stageName;
private StageProperties stageProperties;

public CreateStageCommand(boolean ifNotExists, String stageName, Map<String, String> properties) {
super(PlanType.CREATE_STAGE);
this.ifNotExists = ifNotExists;
this.stageName = stageName;
this.stageProperties = new StageProperties(properties);
this.type = StagePB.StageType.EXTERNAL;
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCreateStageCommand(this, context);
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate();
((CloudEnv) Env.getCurrentEnv()).createStage(this);
}

private void validate() throws UserException {
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
// check stage name
FeNameFormat.checkResourceName(stageName, ResourceTypeEnum.STAGE);
stageProperties.analyze();
checkObjectStorageInfo();
}

private void checkObjectStorageInfo() throws UserException {
RemoteBase remote = null;
try {
tryConnect(stageProperties.getEndpoint());
StagePB stagePB = toStageProto();
if (stagePB.getAccessType() == StageAccessType.IAM
|| stagePB.getAccessType() == StageAccessType.BUCKET_ACL) {
GetIamResponse iamUsers = ((CloudInternalCatalog) Env.getCurrentInternalCatalog()).getIam();
RamUserPB user;
if (stagePB.getAccessType() == StageAccessType.BUCKET_ACL) {
if (!iamUsers.hasRamUser()) {
throw new AnalysisException("Instance does not have ram user");
}
user = iamUsers.getRamUser();
} else {
user = iamUsers.getIamUser();
}
ObjectStoreInfoPB objInfoPB = ObjectStoreInfoPB.newBuilder(stagePB.getObjInfo()).setAk(user.getAk())
.setSk(user.getSk()).build();
stagePB = StagePB.newBuilder(stagePB).setExternalId(user.getExternalId()).setObjInfo(objInfoPB).build();
}
ObjectInfo objectInfo = RemoteBase.analyzeStageObjectStoreInfo(stagePB);
remote = RemoteBase.newInstance(objectInfo);
// RemoteBase#headObject does not throw exception if key does not exist.
remote.headObject("1");
remote.listObjects(null);
} catch (Exception e) {
LOG.warn("Failed to access object storage, proto={}, err={}",
stageProperties.getObjectStoreInfoPB(), e.toString());
String msg;
if (e instanceof UserException) {
msg = ((UserException) e).getDetailMessage();
} else {
msg = e.getMessage();
}
throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to access object storage, message=" + msg, e);
} finally {
if (remote != null) {
remote.close();
}
}
}

private void tryConnect(String endpoint) throws Exception {
HttpURLConnection connection = null;
try {
String urlStr = "http://" + endpoint;
// TODO: Server-Side Request Forgery Check is still need?
URL url = new URL(urlStr);
connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(10000);
connection.connect();
} catch (SocketTimeoutException e) {
throw e;
} catch (Exception e) {
LOG.warn("Failed to connect endpoint=" + endpoint, e);
throw e;
} finally {
if (connection != null) {
try {
connection.disconnect();
} catch (Exception e) {
LOG.warn("Failed to disconnect connection, endpoint={}", endpoint, e);
}
}
}
}

/**
* StagePB
*/
public StagePB toStageProto() throws DdlException {
StagePB.Builder stageBuilder = StagePB.newBuilder();
// external stage doesn't need username
stageBuilder.setStageId(UUID.randomUUID().toString());
switch (type) {
case EXTERNAL:
stageBuilder.setName(getStageName()).setType(StageType.EXTERNAL)
.setObjInfo(stageProperties.getObjectStoreInfoPB()).setComment(stageProperties.getComment())
.setCreateTime(System.currentTimeMillis()).setAccessType(stageProperties.getAccessType());
break;
case INTERNAL:
default:
throw new DdlException("Can not create stage with type=" + type);
}
stageBuilder.putAllProperties(stageProperties.getDefaultProperties());
if (stageBuilder.getAccessType() == StageAccessType.IAM) {
stageBuilder.setRoleName(stageProperties.getRoleName()).setArn(stageProperties.getArn());
}
return stageBuilder.build();
}

public boolean isDryRun() {
return stageProperties.isDryRun();
}

@Override
public boolean needAuditEncryption() {
return true;
}

public boolean isIfNotExists() {
return ifNotExists;
}

public String getStageName() {
return stageName;
}

public StageProperties getStageProperties() {
return stageProperties;
}
}
Loading