Latest update: 09/03/2025
When users select Amazon S3 as the target storage for a Data Hub task, the following preparations are required on the AWS side.
1. Prerequisites
● A valid AWS account.
● Permissions to create S3 Buckets, IAM users/policies.
● Access to the AWS Console.
2. Creating a Bucket
1. Log in to the AWS S3 Console.
2. Create a new bucket:
a. If no S3 bucket exists yet, refer to AWS documentation on creating your first S3 bucket.
b. Enter a globally unique bucket name (recommended: include business identifier, e.g., company-xnurta-data).
c. Select a region (e.g., us-east-1, ap-northeast-1, cn-north-1).
d. Keep other options default, click Create bucket. The new bucket will appear in the bucket list.
3. Retrieving Bucket Information
● Bucket Name: visible in the bucket list (e.g., company-xnurta-data).
● Region: displayed in the top-right corner of the bucket details page (e.g., us-east-1).
4. Creating an IAM User and Access Keys
To ensure security, it is recommended to create a dedicated IAM user for Data Hub with write-only permissions to the target bucket.
1. Go to IAM Console → Users → Add user.
2. Enter username (e.g., xnurta-data-task), select Programmatic access.
3. Click Attach policies directly → Create policy.
4. In JSON mode, paste the following policy (replace <BUCKET_NAME>):
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<BUCKET_NAME>/*"
]
}
]
}
5. Attach the new policy, complete user creation.
6. The system generates an Access Key ID (public key) and Secret Access Key (private key, shown only once). Save securely.
5. Testing the Connection: Back in the Data Hub task creation page:
a. Enter:
i. Bucket Name: your bucket name.
ii. Region: bucket region (e.g., us-east-1).
iii. Access Key ID: IAM user’s key ID.
iv. Secret Access Key: IAM user’s secret key.
b. Click Test Connection.
i. The system writes a test file into the bucket.
ii. If successful: “Connection successful”.
iii. If failed: please check the entered information.
6. Data File
Once configured, the system automatically creates a directory in the bucket, named after the task ID:
/reports/<taskId>/
Example:
/reports/161/
This directory contains:
● Sync log file: sync_log_<taskId>.csv — records execution history.
Field |
Type |
Description |
Execution Record ID |
bigint |
Unique sequence number for the run (auto-incremented), for tracing/debugging. |
Task ID |
bigint |
Unique ID of the data task (matches <taskId>). |
Request ID |
UUID |
Unique ID for the request, for cross-service tracking. |
Trigger Time |
datetime |
Time when the task was triggered by the scheduler. |
Start Time |
datetime |
Time when execution started. |
End Time |
datetime |
Time when execution finished or failed. |
File Send Time |
datetime |
Time when file transfer to target storage began. |
Status |
int |
Execution status code: 0 pending / 1 running / 2 dispatch success / 3 dispatch fail / 4 file pending / 5 file generation fail / 6 file send success / 7 file send fail / 8 file download fail. |
File Name |
string |
Name of generated zip file (e.g., market_insight_161_20250826_xxx.zip). |
File Size (B) |
bigint |
File size in bytes. |
Failure Reason |
string |
Brief failure reason (permissions, network, timeout). Empty if successful. |
● Data zip package: <data_source>_<taskId>_<timestamp>.zip, containing one or more CSV files.
7. Sample Code: Daily Parsing and System Update
Use case: enterprise processes Data Hub’s recurring outputs daily to update internal systems or data warehouses.
Logic outline:
● Only process “unprocessed” zip files.
● Prefer using sync_log.csv to determine completed files, fallback to S3 object list if unavailable.
● Unzip and load tables into database.
● Handle retries and idempotency.
Note: The following code demonstrates the logic only. Actual implementation must be adapted to the enterprise’s own architecture, database type, security, and operations processes.
import boto3
import zipfile
import os
import pandas as pd
from io import BytesIO
# ====== Configuration ======
AWS_ACCESS_KEY_ID = "<Your-Access-Key-ID>"
AWS_SECRET_ACCESS_KEY = "<Your-Secret-Access-Key>"
AWS_REGION = "us-east-1"
BUCKET_NAME = "company-xnurta-data"
TASK_ID = "161" # Replace with your task ID
LOCAL_DOWNLOAD_PATH = "./downloads"
# ====== Initialize S3 Client ======
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
def fetch_from_sync_log():
"""
Determine which file to download via sync_log_<taskId>.csv
"""
# 1. Download sync_log file
log_key = f"reports/{TASK_ID}/sync_log_{TASK_ID}.csv"
log_obj = s3_client.get_object(Bucket=BUCKET_NAME, Key=log_key)
log_df = pd.read_csv(log_obj["Body"])
# 2. Filter successful records (status = 6 means success)
success_logs = log_df[log_df["Status"] == 6]
if success_logs.empty:
print("No successful task records found")
return
# 3. Get the latest successful record
latest_record = success_logs.sort_values("File Send Time
").iloc[-1]
file_name = latest_record["File Name"]
print(f"Downloading file: {file_name}")
# 4. Download zip package
os.makedirs(LOCAL_DOWNLOAD_PATH, exist_ok=True)
zip_path = os.path.join(LOCAL_DOWNLOAD_PATH, file_name)
s3_client.download_file(BUCKET_NAME, f"reports/{TASK_ID}/{file_name}", zip_path)
# 5. Extract files
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall(LOCAL_DOWNLOAD_PATH)
extracted_files = z.namelist()
print(f"Extracted files: {extracted_files}")
# 6. Parse CSVs
for f in extracted_files:
if f.endswith(".csv"):
fpath = os.path.join(LOCAL_DOWNLOAD_PATH, f)
df = pd.read_csv(fpath)
# Example: write to database
# df.to_sql("market_insight", con=mysql_engine, if_exists="append", index=False)
print(f"Processed file: {f}, rows: {len(df)}")
if __name__ == "__main__":
fetch_from_sync_log()
Key Notes
● Idempotency: record processed zip keys locally (e.g., processed_<TASK_ID>.json) to avoid duplicate imports, or log them in your DB.
● Use sync_log: use sync_log_<taskId>.csv for successful file names; fallback to S3 object listing.
● Table mapping: by default, file name market_insight_asin_161_20250826.csv maps to table market_insight_asin. A mapping table can be maintained for accuracy.
8. Historical Data Export
Besides recurring Data Hub tasks, users/developers can also perform one-time backfill exports to S3.
Use Cases
● At system onboarding, to import historical data (e.g., last 6 months).
● To recover missing data from specific days.
● For reconciliation or migration in data warehouses.
Instruction
1. In Data Hub, create a new task, select One-time push.
2. In the Report data range section, specify the export period (up to 1 year, depending on data source).
a. Example: 2024-01-01 to 2024-06-30.
3. Configure target storage (e.g., Amazon S3), complete permission validation, and submit task.
4. Retrieve historical data from the target location.