[data] support for specifying a dataset in cloud storage (#7567)
* add support for loading datasets from s3/gcs * add comments to readme * run linter and address comments * add option to pass in kwargs to ray init (i.e. runtime env) * address comment * revert mixed up changes
This commit is contained in:
@@ -12,9 +12,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from enum import Enum, unique
|
||||
from typing import TYPE_CHECKING, Optional, TypedDict, Union
|
||||
|
||||
import fsspec
|
||||
from datasets import DatasetDict, concatenate_datasets, interleave_datasets
|
||||
|
||||
from ..extras import logging
|
||||
@@ -138,3 +140,50 @@ def get_dataset_module(dataset: Union["Dataset", "DatasetDict"]) -> "DatasetModu
|
||||
dataset_module["train_dataset"] = dataset
|
||||
|
||||
return dataset_module
|
||||
|
||||
|
||||
def setup_fs(path, anon=False):
|
||||
"""Set up a filesystem object based on the path protocol."""
|
||||
storage_options = {"anon": anon} if anon else {}
|
||||
|
||||
if path.startswith("s3://"):
|
||||
fs = fsspec.filesystem("s3", **storage_options)
|
||||
elif path.startswith(("gs://", "gcs://")):
|
||||
fs = fsspec.filesystem("gcs", **storage_options)
|
||||
else:
|
||||
raise ValueError(f"Unsupported protocol in path: {path}. Use 's3://' or 'gs://'")
|
||||
return fs
|
||||
|
||||
|
||||
def read_cloud_json(cloud_path):
|
||||
"""Read a JSON/JSONL file from cloud storage (S3 or GCS).
|
||||
|
||||
Args:
|
||||
cloud_path : str
|
||||
Cloud path in the format:
|
||||
- 's3://bucket-name/file.json' for AWS S3
|
||||
- 'gs://bucket-name/file.jsonl' or 'gcs://bucket-name/file.jsonl' for Google Cloud Storage
|
||||
lines : bool, default=True
|
||||
If True, read the file as JSON Lines format (one JSON object per line)
|
||||
"""
|
||||
try:
|
||||
# Try with anonymous access first
|
||||
fs = setup_fs(cloud_path, anon=True)
|
||||
return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl"))
|
||||
except Exception:
|
||||
# Try again with credentials
|
||||
fs = setup_fs(cloud_path)
|
||||
return _read_json_with_fs(fs, cloud_path, lines=cloud_path.endswith(".jsonl"))
|
||||
|
||||
|
||||
def _read_json_with_fs(fs, path, lines=True):
|
||||
"""Helper function to read JSON/JSONL files using fsspec."""
|
||||
with fs.open(path, "r") as f:
|
||||
if lines:
|
||||
# Read JSONL (JSON Lines) format - one JSON object per line
|
||||
data = [json.loads(line) for line in f if line.strip()]
|
||||
else:
|
||||
# Read regular JSON format
|
||||
data = json.load(f)
|
||||
|
||||
return data
|
||||
|
||||
Reference in New Issue
Block a user