Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,13 @@ private String createContentCloud(final GitCreateContentRequest request, final S
multipartBuilder.addPart(FIELD_BRANCH, StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
multipartBuilder.addPart(FIELD_PARENTS, StandardHttpContentType.TEXT_PLAIN, branchHead.getBytes(StandardCharsets.UTF_8));

final String authorName = request.getAuthorName();
final String authorEmail = request.getAuthorEmail();
if (authorName != null && authorEmail != null) {
final String authorValue = "%s <%s>".formatted(authorName, authorEmail);
multipartBuilder.addPart(FIELD_AUTHOR, StandardHttpContentType.TEXT_PLAIN, authorValue.getBytes(StandardCharsets.UTF_8));
}

final HttpResponseEntity response = this.webClient.getWebClientService()
.post()
.uri(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ public String createContent(final GitCreateContentRequest request) throws FlowRe
.addQueryParameter(API, API_VERSION)
.build();

final String authorName = request.getAuthorName();
final String authorEmail = request.getAuthorEmail();
final Author author = (authorName != null && authorEmail != null) ? new Author(authorName, authorEmail) : null;

for (int attempt = 1; attempt <= MAX_PUSH_ATTEMPTS; attempt++) {
if (expectedFileCommit != null) {
final Optional<String> currentFileCommit = getContentSha(request.getPath(), branch);
Expand All @@ -385,7 +389,7 @@ public String createContent(final GitCreateContentRequest request) throws FlowRe
}

final String branchHead = fetchBranchHead(branch);
final HttpResponseEntity response = executePush(pushUri, branch, branchHead, encoded, message, changeType, path);
final HttpResponseEntity response = executePush(pushUri, branch, branchHead, encoded, message, changeType, path, author);

if (response.statusCode() == HttpURLConnection.HTTP_CREATED) {
try {
Expand Down Expand Up @@ -421,10 +425,10 @@ private String fetchBranchHead(final String branch) throws FlowRegistryException

private HttpResponseEntity executePush(final URI pushUri, final String branch, final String oldObjectId,
final String encodedContent, final String message,
final String changeType, final String path) throws FlowRegistryException {
final String changeType, final String path, final Author author) throws FlowRegistryException {
final PushRequest pushRequest = new PushRequest(
List.of(new RefUpdate(REFS_HEADS_PREFIX + branch, oldObjectId)),
List.of(new Commit(message, List.of(new Change(changeType, new Item(path), new NewContent(encodedContent, CONTENT_TYPE_BASE64)))))
List.of(new Commit(message, List.of(new Change(changeType, new Item(path), new NewContent(encodedContent, CONTENT_TYPE_BASE64))), author))
);

final String json;
Expand Down Expand Up @@ -455,8 +459,7 @@ public InputStream deleteContent(final String filePath, final String commitMessa

final PushRequest pushRequest = new PushRequest(
List.of(new RefUpdate(REFS_HEADS_PREFIX + branch, oldObjectId)),
List.of(new Commit(commitMessage,
List.of(new Change(CHANGE_TYPE_DELETE, new Item(path), null))))
List.of(new Commit(commitMessage, List.of(new Change(CHANGE_TYPE_DELETE, new Item(path), null)), null))
);

final String json = MAPPER.writeValueAsString(pushRequest);
Expand Down Expand Up @@ -506,7 +509,10 @@ private record PushRequest(List<RefUpdate> refUpdates, List<Commit> commits) { }

private record RefUpdate(String name, String oldObjectId) { }

private record Commit(String comment, List<Change> changes) { }
@JsonInclude(JsonInclude.Include.NON_NULL)
private record Commit(String comment, List<Change> changes, Author author) { }

private record Author(String name, String email) { }

private record Item(String path) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ public abstract class AbstractGitFlowRegistryClient extends AbstractFlowRegistry
.identifiesControllerService(SSLContextProvider.class)
.build();

public static final PropertyDescriptor COMMIT_AUTHOR_SOURCE = new PropertyDescriptor.Builder()
.name("Commit Author Source")
.description("""
Specifies how the commit author is determined for Git commits. \
When set to Service User, the authenticated service account is used as the commit author. \
When set to Application User, the identity of the NiFi user performing the action is used as \
both the author name and author email, while the service account remains the committer.""")
.allowableValues(CommitAuthorSource.class)
.defaultValue(CommitAuthorSource.SERVICE_USER)
.required(true)
.build();

static final String DEFAULT_BUCKET_NAME = "default";
static final String DEFAULT_BUCKET_KEEP_FILE_PATH = DEFAULT_BUCKET_NAME + "/.keep";
static final String DEFAULT_BUCKET_KEEP_FILE_CONTENT = "Do Not Delete";
Expand Down Expand Up @@ -148,6 +160,7 @@ public void initialize(final FlowRegistryClientInitializationContext context) {
combinedPropertyDescriptors.add(REPOSITORY_PATH);
combinedPropertyDescriptors.add(DIRECTORY_FILTER_EXCLUDE);
combinedPropertyDescriptors.add(PARAMETER_CONTEXT_VALUES);
combinedPropertyDescriptors.add(COMMIT_AUTHOR_SOURCE);
combinedPropertyDescriptors.add(SSL_CONTEXT_SERVICE);
propertyDescriptors = Collections.unmodifiableList(combinedPropertyDescriptors);

Expand Down Expand Up @@ -238,6 +251,7 @@ public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext
final FlowLocation flowLocation = new FlowLocation(branch, flow.getBucketIdentifier(), flow.getIdentifier());
final String filePath = getSnapshotFilePath(flowLocation);
final String commitMessage = REGISTER_FLOW_MESSAGE_FORMAT.formatted(flow.getIdentifier());
final String userIdentity = resolveAuthorIdentity(context);

final Optional<String> existingFileSha = repositoryClient.getContentSha(filePath, branch);
if (existingFileSha.isPresent()) {
Expand All @@ -258,6 +272,8 @@ public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext
.path(filePath)
.content(flowSnapshotSerializer.serialize(flowSnapshot))
.message(commitMessage)
.authorName(userIdentity)
.authorEmail(userIdentity)
.build();

repositoryClient.createContent(request);
Expand All @@ -278,7 +294,8 @@ public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContex
final String branch = flowLocation.getBranch();
final String filePath = getSnapshotFilePath(flowLocation);
final String commitMessage = DEREGISTER_FLOW_MESSAGE_FORMAT.formatted(flowLocation.getFlowId());
try (final InputStream deletedSnapshotContent = repositoryClient.deleteContent(filePath, commitMessage, branch)) {
final String userIdentity = resolveAuthorIdentity(context);
try (final InputStream deletedSnapshotContent = repositoryClient.deleteContent(filePath, commitMessage, branch, userIdentity, userIdentity)) {
final RegisteredFlowSnapshot deletedSnapshot = getSnapshot(deletedSnapshotContent);
populateFlowAndSnapshotMetadata(deletedSnapshot, flowLocation);
updateBucketReferences(repositoryClient, deletedSnapshot, flowLocation.getBucketId());
Expand Down Expand Up @@ -437,13 +454,17 @@ public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfi
final String originalFlowContentsGroupId = replaceGroupId(flowSnapshot.getFlowContents(), FLOW_CONTENTS_GROUP_ID);
final Position originalFlowContentsPosition = replacePosition(flowSnapshot.getFlowContents(), new Position(0, 0));

final String userIdentity = resolveAuthorIdentity(context);

final GitCreateContentRequest createContentRequest = GitCreateContentRequest.builder()
.branch(branch)
.path(filePath)
.content(flowSnapshotSerializer.serialize(flowSnapshot))
.message(commitMessage)
.existingContentSha(existingBlobSha)
.expectedCommitSha(expectedVersion)
.authorName(userIdentity)
.authorEmail(userIdentity)
.build();

final String createContentCommitSha = repositoryClient.createContent(createContentRequest);
Expand Down Expand Up @@ -656,6 +677,11 @@ private void verifyWritePermissions(final GitRepositoryClient repositoryClient)
}
}

private String resolveAuthorIdentity(final FlowRegistryClientConfigurationContext context) {
final CommitAuthorSource source = context.getProperty(COMMIT_AUTHOR_SOURCE).asAllowableValue(CommitAuthorSource.class);
return CommitAuthorSource.APPLICATION_USER.equals(source) ? context.getNiFiUserIdentity().orElse(null) : null;
}

private void verifyReadPermissions(final GitRepositoryClient repositoryClient) throws AuthorizationException {
if (!repositoryClient.hasReadPermission()) {
throw new AuthorizationException("Client does not have read access to the repository");
Expand Down Expand Up @@ -839,6 +865,34 @@ protected FlowSnapshotSerializer createFlowSnapshotSerializer() {
return new JacksonFlowSnapshotSerializer();
}

enum CommitAuthorSource implements DescribedValue {
SERVICE_USER("Service User", "The commit author is the authenticated service account configured on this registry client"),
APPLICATION_USER("Application User", "The commit author is the NiFi user performing the action, using the identity as both author name and author email");

private final String displayName;
private final String description;

CommitAuthorSource(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}

enum ParameterContextValuesStrategy implements DescribedValue {
RETAIN("Retain", "Retain Values in Parameter Contexts without modifications"),
REMOVE("Remove", "Remove Values from Parameter Context"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class GitCreateContentRequest {
private final String message;
private final String existingContentSha;
private final String expectedCommitSha;
private final String authorName;
private final String authorEmail;

private GitCreateContentRequest(final Builder builder) {
this.branch = Objects.requireNonNull(builder.branch);
Expand All @@ -37,6 +39,8 @@ private GitCreateContentRequest(final Builder builder) {
this.existingContentSha = builder.existingContentSha;
// Commit SHA for providers that support atomic commits via commit SHA
this.expectedCommitSha = builder.expectedCommitSha;
this.authorName = builder.authorName;
this.authorEmail = builder.authorEmail;
}

public String getBranch() {
Expand All @@ -63,6 +67,14 @@ public String getExpectedCommitSha() {
return expectedCommitSha;
}

public String getAuthorName() {
return authorName;
}

public String getAuthorEmail() {
return authorEmail;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -74,6 +86,8 @@ public static final class Builder {
private String message;
private String existingContentSha;
private String expectedCommitSha;
private String authorName;
private String authorEmail;

public Builder branch(final String branch) {
this.branch = branch;
Expand Down Expand Up @@ -105,6 +119,16 @@ public Builder expectedCommitSha(final String expectedCommitSha) {
return this;
}

public Builder authorName(final String authorName) {
this.authorName = authorName;
return this;
}

public Builder authorEmail(final String authorEmail) {
this.authorEmail = authorEmail;
return this;
}

public GitCreateContentRequest build() {
return new GitCreateContentRequest(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,25 @@ default Optional<String> getContentShaAtCommit(String path, String commitSha) th
*/
InputStream deleteContent(String filePath, String commitMessage, String branch) throws FlowRegistryException, IOException;

/**
* Deletes the file at the given path on the given branch, attributing the commit to the specified author.
*
* The caller of this method is responsible for closing the returned InputStream.
*
* @param filePath the path of the file
* @param commitMessage the commit message
* @param branch the branch
* @param authorName the name of the commit author, or null to use the authenticated user
* @param authorEmail the email of the commit author, or null to use the authenticated user
* @return the input stream to the deleted content
* @throws IOException if an I/O error occurs
* @throws FlowRegistryException if a non-I/O error occurs
*/
default InputStream deleteContent(final String filePath, final String commitMessage, final String branch,
final String authorName, final String authorEmail) throws FlowRegistryException, IOException {
return deleteContent(filePath, commitMessage, branch);
}

/**
* Closes any resources held by the client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<groupId>org.kohsuke</groupId>
<artifactId>github-api</artifactId>
<version>${github-api.version}</version>
<exclusions>
<exclusion>
<groupId>com.infradna.tool</groupId>
<artifactId>bridge-method-annotation</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected List<PropertyDescriptor> createPropertyDescriptors() {
}

@Override
protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
protected GitRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
return GitHubRepositoryClient.builder()
.logger(getLogger())
.apiUrl(context.getProperty(GITHUB_API_URL).getValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.nifi.ssl.SSLContextProvider;
import org.kohsuke.github.GHCommit;
import org.kohsuke.github.GHContent;
import org.kohsuke.github.GHContentBuilder;
import org.kohsuke.github.GHContentUpdateResponse;
import org.kohsuke.github.GHMyself;
import org.kohsuke.github.GHPermissionType;
import org.kohsuke.github.GHRef;
import org.kohsuke.github.GHRepository;
import org.kohsuke.github.GHUser;
import org.kohsuke.github.GitHub;
import org.kohsuke.github.GitHubAbuseLimitHandler;
import org.kohsuke.github.GitHubBuilder;
Expand All @@ -50,7 +52,6 @@
import java.io.InputStream;
import java.net.http.HttpClient;
import java.security.PrivateKey;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -201,13 +202,20 @@ public String createContent(final GitCreateContentRequest request) throws IOExce
logger.debug("Creating content at path [{}] on branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
return execute(() -> {
try {
final GHContentUpdateResponse response = repository.createContent()
final GHContentBuilder contentBuilder = repository.createContent()
.branch(branch)
.path(resolvedPath)
.content(request.getContent())
.message(request.getMessage())
.sha(request.getExistingContentSha())
.commit();
.sha(request.getExistingContentSha());

final String authorName = request.getAuthorName();
final String authorEmail = request.getAuthorEmail();
if (authorName != null && authorEmail != null) {
contentBuilder.author(authorName, authorEmail);
}

final GHContentUpdateResponse response = contentBuilder.commit();
return response.getCommit().getSha();
} catch (final FileNotFoundException fnf) {
throwPathOrBranchNotFound(fnf, resolvedPath, branch);
Expand Down Expand Up @@ -477,15 +485,13 @@ private GitCommit toGitCommit(final GHCommit ghCommit) throws IOException {

if (commit == null) {
final GHCommit.ShortInfo shortInfo = ghCommit.getCommitShortInfo();
final String author = ghCommit.getAuthor() != null
? ghCommit.getAuthor().getLogin()
: shortInfo.getAuthor().getName();

final GHUser ghUser = ghCommit.getAuthor();
final String author = ghUser != null ? ghUser.getLogin() : shortInfo.getAuthor().getName();
commit = new GitCommit(
ghCommit.getSHA1(),
author,
shortInfo.getMessage(),
Instant.ofEpochMilli(shortInfo.getCommitDate().getTime()));
ghCommit.getSHA1(),
author,
shortInfo.getMessage(),
shortInfo.getCommitDate());
commitCache.put(ghCommit.getSHA1(), commit);
}

Expand Down
Loading
Loading