Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions drivers/115_open/upload.go

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When uploading multiple files at the same time, is locking required to prevent race condition?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an extra lock is needed.

The mutable state changed by the refresh logic (bucket, tokenObtained, and the refreshOSSToken closure) is local to one multpartUpload call. Each concurrent file upload enters Put separately, gets its own UploadInit response, OSS token, OSS client/bucket, multipart upload session (imur), and parts slice.

Within a single multipart upload, parts are uploaded sequentially in the loop; retry.Do is synchronous and no goroutine shares bucket while it can be replaced. So refreshing the token only affects the current upload's local bucket handle.

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
return err
}

// The OSS STS token returned by UploadGetToken expires in about 1 hour.
// A slow or large upload can outlive it, causing SecurityTokenExpired /
// InvalidAccessKeyId. Refresh the token ahead of expiry and rebuild the OSS
// client with the new credentials, reusing the same multipart session (imur)
// to keep uploading the remaining parts.
tokenObtained := time.Now()
refreshOSSToken := func() error {
newToken, err := d.client.UploadGetToken(ctx)
if err != nil {
return err
}
newClient, err := netutil.NewOSSClient(newToken.Endpoint, newToken.AccessKeyId, newToken.AccessKeySecret, oss.SecurityToken(newToken.SecurityToken))
if err != nil {
return err
}
newBucket, err := newClient.Bucket(initResp.Bucket)
if err != nil {
return err
}
bucket = newBucket
tokenObtained = time.Now()
return nil
}

fileSize := stream.GetSize()
chunkSize := calPartSize(fileSize)
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize), &up)
Expand All @@ -100,6 +124,13 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
return ctx.Err()
}

// The token lives ~1 hour; refresh after 45 minutes to keep a safe margin.
if time.Since(tokenObtained) > 45*time.Minute {
if err := refreshOSSToken(); err != nil {
return err
}
}

partSize := chunkSize
if i == partNum {
partSize = fileSize - (i-1)*chunkSize
Expand Down
Loading