Search...
Menu

[New] Developer Guide: Export Your Data to Amazon S3

Latest update: 09/03/2025

When users select Amazon S3 as the target storage for a Data Hub task, the following preparations are required on the AWS side.

 

1. Prerequisites

● A valid AWS account.

● Permissions to create S3 Buckets, IAM users/policies.

● Access to the AWS Console.

 

2. Creating a Bucket

1.  Log in to the AWS S3 Console.

2.  Create a new bucket:

a.  If no S3 bucket exists yet, refer to AWS documentation on creating your first S3 bucket.

b.  Enter a globally unique bucket name (recommended: include business identifier, e.g., company-xnurta-data).

c.  Select a region (e.g., us-east-1, ap-northeast-1, cn-north-1).

d.  Keep other options default, click Create bucket. The new bucket will appear in the bucket list.

 

3. Retrieving Bucket Information

● Bucket Name: visible in the bucket list (e.g., company-xnurta-data).

● Region: displayed in the top-right corner of the bucket details page (e.g., us-east-1).

 

4. Creating an IAM User and Access Keys

To ensure security, it is recommended to create a dedicated IAM user for Data Hub with write-only permissions to the target bucket.

1.  Go to IAM Console → Users → Add user.

2.  Enter username (e.g., xnurta-data-task), select Programmatic access.

3.  Click Attach policies directly → Create policy.

4.  In JSON mode, paste the following policy (replace <BUCKET_NAME>):

 

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:PutObjectAcl",
        "s3:GetObject"
      ],
      "Resource": [
        "arn:aws:s3:::<BUCKET_NAME>/*"
      ]
    }
  ]
}

 

 

5.  Attach the new policy, complete user creation.

6.  The system generates an Access Key ID (public key) and Secret Access Key (private key, shown only once). Save securely.

5.  Testing the Connection: Back in the Data Hub task creation page:

a.  Enter:

i.  Bucket Name: your bucket name.

ii.  Region: bucket region (e.g., us-east-1).

iii.  Access Key ID: IAM user’s key ID.

iv.  Secret Access Key: IAM user’s secret key.

b.  Click Test Connection.

i.  The system writes a test file into the bucket.

ii.  If successful: “Connection successful”.

iii.  If failed: please check the entered information.

 

6. Data File

Once configured, the system automatically creates a directory in the bucket, named after the task ID:

 

/reports/<taskId>/  

 

Example:

/reports/161/  

 

 

This directory contains:

● Sync log file: sync_log_<taskId>.csv — records execution history.

Field

Type

Description

Execution Record ID

bigint

Unique sequence number for the run (auto-incremented), for tracing/debugging.

Task ID

bigint

Unique ID of the data task (matches <taskId>).

Request ID

UUID

Unique ID for the request, for cross-service tracking.

Trigger Time

datetime

Time when the task was triggered by the scheduler.

Start Time

datetime

Time when execution started.

End Time

datetime

Time when execution finished or failed.

File Send Time

datetime

Time when file transfer to target storage began.

Status

int

Execution status code: 0 pending / 1 running / 2 dispatch success / 3 dispatch fail / 4 file pending / 5 file generation fail / 6 file send success / 7 file send fail / 8 file download fail.

File Name

string

Name of generated zip file (e.g., market_insight_161_20250826_xxx.zip).

File Size (B)

bigint

File size in bytes.

Failure Reason

string

Brief failure reason (permissions, network, timeout). Empty if successful.

● Data zip package: <data_source>_<taskId>_<timestamp>.zip, containing one or more CSV files.

 

7. Sample Code: Daily Parsing and System Update

Use case: enterprise processes Data Hub’s recurring outputs daily to update internal systems or data warehouses.

Logic outline:

● Only process “unprocessed” zip files.

● Prefer using sync_log.csv to determine completed files, fallback to S3 object list if unavailable.

● Unzip and load tables into database.

● Handle retries and idempotency.

Note: The following code demonstrates the logic only. Actual implementation must be adapted to the enterprise’s own architecture, database type, security, and operations processes.

import boto3
import zipfile
import os
import pandas as pd
from io import BytesIO
 
# ====== Configuration ======
AWS_ACCESS_KEY_ID = "<Your-Access-Key-ID>"
AWS_SECRET_ACCESS_KEY = "<Your-Secret-Access-Key>"
AWS_REGION = "us-east-1"
BUCKET_NAME = "company-xnurta-data"
TASK_ID = "161"  # Replace with your task ID
LOCAL_DOWNLOAD_PATH = "./downloads"
 
# ====== Initialize S3 Client ======
s3_client = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)
 
def fetch_from_sync_log():
    """
    Determine which file to download via sync_log_<taskId>.csv
    """
    # 1. Download sync_log file
    log_key = f"reports/{TASK_ID}/sync_log_{TASK_ID}.csv"
    log_obj = s3_client.get_object(Bucket=BUCKET_NAME, Key=log_key)
    log_df = pd.read_csv(log_obj["Body"])
 
    # 2. Filter successful records (status = 6 means success)
    success_logs = log_df[log_df["Status"] == 6]
 
    if success_logs.empty:
        print("No successful task records found")
        return
 
    # 3. Get the latest successful record
    latest_record = success_logs.sort_values("File Send Time
").iloc[-1]
    file_name = latest_record["File Name"]
 
    print(f"Downloading file: {file_name}")
 
    # 4. Download zip package
    os.makedirs(LOCAL_DOWNLOAD_PATH, exist_ok=True)
    zip_path = os.path.join(LOCAL_DOWNLOAD_PATH, file_name)
 
    s3_client.download_file(BUCKET_NAME, f"reports/{TASK_ID}/{file_name}", zip_path)
 
    # 5. Extract files
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(LOCAL_DOWNLOAD_PATH)
        extracted_files = z.namelist()
 
    print(f"Extracted files: {extracted_files}")
 
    # 6. Parse CSVs
    for f in extracted_files:
        if f.endswith(".csv"):
            fpath = os.path.join(LOCAL_DOWNLOAD_PATH, f)
            df = pd.read_csv(fpath)
            # Example: write to database
            # df.to_sql("market_insight", con=mysql_engine, if_exists="append", index=False)
            print(f"Processed file: {f}, rows: {len(df)}")
 
if __name__ == "__main__":
    fetch_from_sync_log()

 

 

Key Notes

● Idempotency: record processed zip keys locally (e.g., processed_<TASK_ID>.json) to avoid duplicate imports, or log them in your DB.

● Use sync_log: use sync_log_<taskId>.csv for successful file names; fallback to S3 object listing.

● Table mapping: by default, file name market_insight_asin_161_20250826.csv maps to table market_insight_asin. A mapping table can be maintained for accuracy.

 

8. Historical Data Export

Besides recurring Data Hub tasks, users/developers can also perform one-time backfill exports to S3.

Use Cases

● At system onboarding, to import historical data (e.g., last 6 months).

● To recover missing data from specific days.

● For reconciliation or migration in data warehouses.

Instruction

1.  In Data Hub, create a new task, select One-time push.

2.  In the Report data range section, specify the export period (up to 1 year, depending on data source).

a.  Example: 2024-01-01 to 2024-06-30.

3.  Configure target storage (e.g., Amazon S3), complete permission validation, and submit task.

4.  Retrieve historical data from the target location.

Previous
[New] Data Hub: Export Your Data to Destinations
Last modified: 2025-09-04Powered by