classDataFrameSerializationFormat(Enum):""" An enumeration class to represent different file formats, compression options for upload_from_dataframe Attributes: CSV: Representation for 'csv' file format with no compression and its related content type and suffix. CSV_GZIP: Representation for 'csv' file format with 'gzip' compression and its related content type and suffix. PARQUET: Representation for 'parquet' file format with no compression and its related content type and suffix. PARQUET_SNAPPY: Representation for 'parquet' file format with 'snappy' compression and its related content type and suffix. PARQUET_GZIP: Representation for 'parquet' file format with 'gzip' compression and its related content type and suffix. """CSV=("csv",None,"text/csv",".csv")CSV_GZIP=("csv","gzip","application/x-gzip",".csv.gz")PARQUET=("parquet",None,"application/octet-stream",".parquet")PARQUET_SNAPPY=("parquet","snappy","application/octet-stream",".snappy.parquet",)PARQUET_GZIP=("parquet","gzip","application/octet-stream",".gz.parquet")@propertydefformat(self)->str:"""The file format of the current instance."""returnself.value[0]@propertydefcompression(self)->Union[str,None]:"""The compression type of the current instance."""returnself.value[1]@propertydefcontent_type(self)->str:"""The content type of the current instance."""returnself.value[2]@propertydefsuffix(self)->str:"""The suffix of the file format of the current instance."""returnself.value[3]deffix_extension_with(self,gcs_blob_path:str)->str:"""Fix the extension of a GCS blob. Args: gcs_blob_path: The path to the GCS blob to be modified. Returns: The modified path to the GCS blob with the new extension. """gcs_blob_path=PurePosixPath(gcs_blob_path)folder=gcs_blob_path.parentfilename=PurePosixPath(gcs_blob_path.stem).with_suffix(self.suffix)returnstr(folder.joinpath(filename))
The modified path to the GCS blob with the new extension.
Source code in prefect_gcp/cloud_storage.py
535536537538539540541542543544545546547
deffix_extension_with(self,gcs_blob_path:str)->str:"""Fix the extension of a GCS blob. Args: gcs_blob_path: The path to the GCS blob to be modified. Returns: The modified path to the GCS blob with the new extension. """gcs_blob_path=PurePosixPath(gcs_blob_path)folder=gcs_blob_path.parentfilename=PurePosixPath(gcs_blob_path.stem).with_suffix(self.suffix)returnstr(folder.joinpath(filename))
Block used to store data using GCP Cloud Storage Buckets.
Note! GcsBucket in prefect-gcp is a unique block, separate from GCS
in core Prefect. GcsBucket does not use gcsfs under the hood,
instead using the google-cloud-storage package, and offers more configuration
and functionality.
classGcsBucket(WritableDeploymentStorage,WritableFileSystem,ObjectStorageBlock):""" Block used to store data using GCP Cloud Storage Buckets. Note! `GcsBucket` in `prefect-gcp` is a unique block, separate from `GCS` in core Prefect. `GcsBucket` does not use `gcsfs` under the hood, instead using the `google-cloud-storage` package, and offers more configuration and functionality. Attributes: bucket: Name of the bucket. gcp_credentials: The credentials to authenticate with GCP. bucket_folder: A default path to a folder within the GCS bucket to use for reading and writing objects. Example: Load stored GCP Cloud Storage Bucket: ```python from prefect_gcp.cloud_storage import GcsBucket gcp_cloud_storage_bucket_block = GcsBucket.load("BLOCK_NAME") ``` """_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/10424e311932e31c477ac2b9ef3d53cefbaad708-250x250.png"# noqa_block_type_name="GCS Bucket"_documentation_url="https://prefecthq.github.io/prefect-gcp/cloud_storage/#prefect_gcp.cloud_storage.GcsBucket"# noqa: E501bucket:str=Field(...,description="Name of the bucket.")gcp_credentials:GcpCredentials=Field(default_factory=GcpCredentials,description="The credentials to authenticate with GCP.",)bucket_folder:str=Field(default="",description=("A default path to a folder within the GCS bucket to use ""for reading and writing objects."),)@propertydefbasepath(self)->str:""" Read-only property that mirrors the bucket folder. Used for deployment. """returnself.bucket_folder@validator("bucket_folder",pre=True,always=True)def_bucket_folder_suffix(cls,value):""" Ensures that the bucket folder is suffixed with a forward slash. """ifvalue!=""andnotvalue.endswith("/"):value=f"{value}/"returnvaluedef_resolve_path(self,path:str)->str:""" A helper function used in write_path to join `self.bucket_folder` and `path`. Args: path: Name of the key, e.g. "file1". Each object in your bucket has a unique key (or key name). Returns: The joined path. """# If bucket_folder provided, it means we won't write to the root dir of# the bucket. So we need to add it on the front of the path.path=(str(PurePosixPath(self.bucket_folder,path))ifself.bucket_folderelsepath)ifpathin["",".","/"]:# client.bucket.list_blobs(prefix=None) is the proper way# of specifying the root folder of the bucketpath=Nonereturnpath@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->List[Union[str,Path]]:""" Copies a folder from the configured GCS bucket to a local directory. Defaults to copying the entire contents of the block's bucket_folder to the current working directory. Args: from_path: Path in GCS bucket to download from. Defaults to the block's configured bucket_folder. local_path: Local path to download GCS bucket contents to. Defaults to the current working directory. Returns: A list of downloaded file paths. """from_path=(self.bucket_folderiffrom_pathisNoneelseself._resolve_path(from_path))iflocal_pathisNone:local_path=os.path.abspath(".")else:local_path=os.path.abspath(os.path.expanduser(local_path))project=self.gcp_credentials.projectclient=self.gcp_credentials.get_cloud_storage_client(project=project)blobs=awaitrun_sync_in_worker_thread(client.list_blobs,self.bucket,prefix=from_path)file_paths=[]forblobinblobs:blob_path=blob.nameifblob_path[-1]=="/":# object is a folder and will be created if it contains any objectscontinuelocal_file_path=os.path.join(local_path,blob_path)os.makedirs(os.path.dirname(local_file_path),exist_ok=True)withdisable_run_logger():file_path=awaitcloud_storage_download_blob_to_file.fn(bucket=self.bucket,blob=blob_path,path=local_file_path,gcp_credentials=self.gcp_credentials,)file_paths.append(file_path)returnfile_paths@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to the configured GCS bucket in a given folder. Defaults to uploading the entire contents the current working directory to the block's bucket_folder. Args: local_path: Path to local directory to upload from. to_path: Path in GCS bucket to upload to. Defaults to block's configured bucket_folder. ignore_file: Path to file containing gitignore style expressions for filepaths to ignore. Returns: The number of files uploaded. """iflocal_pathisNone:local_path=os.path.abspath(".")else:local_path=os.path.expanduser(local_path)to_path=self.bucket_folderifto_pathisNoneelseself._resolve_path(to_path)included_files=Noneifignore_file:withopen(ignore_file,"r")asf:ignore_patterns=f.readlines()included_files=filter_files(local_path,ignore_patterns)uploaded_file_count=0forlocal_file_pathinPath(local_path).rglob("*"):if(included_filesisnotNoneandstr(local_file_path.relative_to(local_path))notinincluded_files):continueelifnotlocal_file_path.is_dir():remote_file_path=str(PurePosixPath(to_path,local_file_path.relative_to(local_path)))local_file_content=local_file_path.read_bytes()awaitself.write_path(remote_file_path,content=local_file_content)uploaded_file_count+=1returnuploaded_file_count@sync_compatibleasyncdefread_path(self,path:str)->bytes:""" Read specified path from GCS and return contents. Provide the entire path to the key in GCS. Args: path: Entire path to (and including) the key. Returns: A bytes or string representation of the blob object. """path=self._resolve_path(path)withdisable_run_logger():contents=awaitcloud_storage_download_blob_as_bytes.fn(bucket=self.bucket,blob=path,gcp_credentials=self.gcp_credentials)returncontents@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:""" Writes to an GCS bucket. Args: path: The key name. Each object in your bucket has a unique key (or key name). content: What you are uploading to GCS Bucket. Returns: The path that the contents were written to. """path=self._resolve_path(path)withdisable_run_logger():awaitcloud_storage_upload_blob_from_string.fn(data=content,bucket=self.bucket,blob=path,gcp_credentials=self.gcp_credentials,)returnpath# NEW BLOCK INTERFACE METHODS BELOWdef_join_bucket_folder(self,bucket_path:str="")->str:""" Joins the base bucket folder to the bucket path. NOTE: If a method reuses another method in this class, be careful to not call this twice because it'll join the bucket folder twice. See https://github.com/PrefectHQ/prefect-aws/issues/141 for a past issue. """bucket_path=str(bucket_path)ifself.bucket_folder!=""andbucket_path.startswith(self.bucket_folder):self.logger.info(f"Bucket path {bucket_path!r} is already prefixed with "f"bucket folder {self.bucket_folder!r}; is this intentional?")bucket_path=str(PurePosixPath(self.bucket_folder)/bucket_path)ifbucket_pathin["",".","/"]:# client.bucket.list_blobs(prefix=None) is the proper way# of specifying the root folder of the bucketbucket_path=Nonereturnbucket_path@sync_compatibleasyncdefcreate_bucket(self,location:Optional[str]=None,**create_kwargs)->"Bucket":""" Creates a bucket. Args: location: The location of the bucket. **create_kwargs: Additional keyword arguments to pass to the `create_bucket` method. Returns: The bucket object. Examples: Create a bucket. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket(bucket="my-bucket") gcs_bucket.create_bucket() ``` """self.logger.info(f"Creating bucket {self.bucket!r}.")client=self.gcp_credentials.get_cloud_storage_client()bucket=awaitrun_sync_in_worker_thread(client.create_bucket,self.bucket,location=location,**create_kwargs)returnbucket@sync_compatibleasyncdefget_bucket(self)->"Bucket":""" Returns the bucket object. Returns: The bucket object. Examples: Get the bucket object. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.get_bucket() ``` """self.logger.info(f"Getting bucket {self.bucket!r}.")client=self.gcp_credentials.get_cloud_storage_client()bucket=awaitrun_sync_in_worker_thread(client.get_bucket,self.bucket)returnbucket@sync_compatibleasyncdeflist_blobs(self,folder:str="")->List["Blob"]:""" Lists all blobs in the bucket that are in a folder. Folders are not included in the output. Args: folder: The folder to list blobs from. Returns: A list of Blob objects. Examples: Get all blobs from a folder named "prefect". ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_blobs("prefect") ``` """client=self.gcp_credentials.get_cloud_storage_client()bucket_path=self._join_bucket_folder(folder)ifbucket_pathisNone:self.logger.info(f"Listing blobs in bucket {self.bucket!r}.")else:self.logger.info(f"Listing blobs in folder {bucket_path!r} in bucket {self.bucket!r}.")blobs=awaitrun_sync_in_worker_thread(client.list_blobs,self.bucket,prefix=bucket_path)# Ignore foldersreturn[blobforblobinblobsifnotblob.name.endswith("/")]@sync_compatibleasyncdeflist_folders(self,folder:str="")->List[str]:""" Lists all folders and subfolders in the bucket. Args: folder: List all folders and subfolders inside given folder. Returns: A list of folders. Examples: Get all folders from a bucket named "my-bucket". ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_folders() ``` Get all folders from a folder called years ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_folders("years") ``` """# Beware of calling _join_bucket_folder twice, see note in method.# However, we just want to use it to check if we are listing the root folderbucket_path=self._join_bucket_folder(folder)ifbucket_pathisNone:self.logger.info(f"Listing folders in bucket {self.bucket!r}.")else:self.logger.info(f"Listing folders in {bucket_path!r} in bucket {self.bucket!r}.")blobs=awaitself.list_blobs(folder)# gets all folders with full pathfolders={str(PurePosixPath(blob.name).parent)forblobinblobs}return[folderforfolderinfoldersiffolder!="."]@sync_compatibleasyncdefdownload_object_to_path(self,from_path:str,to_path:Optional[Union[str,Path]]=None,**download_kwargs:Dict[str,Any],)->Path:""" Downloads an object from the object storage service to a path. Args: from_path: The path to the blob to download; this gets prefixed with the bucket_folder. to_path: The path to download the blob to. If not provided, the blob's name will be used. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_filename`. Returns: The absolute path that the object was downloaded to. Examples: Download my_folder/notes.txt object to notes.txt. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt") ``` """ifto_pathisNone:to_path=Path(from_path).name# making path absolute, but converting back to str here# since !r looks nicer that way and filename arg expects strto_path=str(Path(to_path).absolute())bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(from_path)blob=bucket.blob(bucket_path)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path {bucket_path!r}"f"to {to_path!r}.")awaitrun_sync_in_worker_thread(blob.download_to_filename,filename=to_path,**download_kwargs)returnPath(to_path)@sync_compatibleasyncdefdownload_object_to_file_object(self,from_path:str,to_file_object:BinaryIO,**download_kwargs:Dict[str,Any],)->BinaryIO:""" Downloads an object from the object storage service to a file-like object, which can be a BytesIO object or a BufferedWriter. Args: from_path: The path to the blob to download from; this gets prefixed with the bucket_folder. to_file_object: The file-like object to download the blob to. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_file`. Returns: The file-like object that the object was downloaded to. Examples: Download my_folder/notes.txt object to a BytesIO object. ```python from io import BytesIO from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with BytesIO() as buf: gcs_bucket.download_object_to_file_object("my_folder/notes.txt", buf) ``` Download my_folder/notes.txt object to a BufferedWriter. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "wb") as f: gcs_bucket.download_object_to_file_object("my_folder/notes.txt", f) ``` """bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(from_path)blob=bucket.blob(bucket_path)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path {bucket_path!r}"f"to file object.")awaitrun_sync_in_worker_thread(blob.download_to_file,file_obj=to_file_object,**download_kwargs)returnto_file_object@sync_compatibleasyncdefdownload_folder_to_path(self,from_folder:str,to_folder:Optional[Union[str,Path]]=None,**download_kwargs:Dict[str,Any],)->Path:""" Downloads objects *within* a folder (excluding the folder itself) from the object storage service to a folder. Args: from_folder: The path to the folder to download from; this gets prefixed with the bucket_folder. to_folder: The path to download the folder to. If not provided, will default to the current directory. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_filename`. Returns: The absolute path that the folder was downloaded to. Examples: Download my_folder to a local folder named my_folder. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.download_folder_to_path("my_folder", "my_folder") ``` """ifto_folderisNone:to_folder=""to_folder=Path(to_folder).absolute()blobs=awaitself.list_blobs(folder=from_folder)iflen(blobs)==0:self.logger.warning(f"No blobs were downloaded from "f"bucket {self.bucket!r} path {from_folder!r}.")returnto_folder# do not call self._join_bucket_folder for list_blobs# because it's built-in to that method already!# however, we still need to do it because we're using relative_tobucket_folder=self._join_bucket_folder(from_folder)async_coros=[]forblobinblobs:bucket_path=PurePosixPath(blob.name).relative_to(bucket_folder)ifstr(bucket_path).endswith("/"):continueto_path=to_folder/bucket_pathto_path.parent.mkdir(parents=True,exist_ok=True)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path "f"{str(bucket_path)!r} to {to_path}.")async_coros.append(run_sync_in_worker_thread(blob.download_to_filename,filename=str(to_path),**download_kwargs))awaitasyncio.gather(*async_coros)returnto_folder@sync_compatibleasyncdefupload_from_path(self,from_path:Union[str,Path],to_path:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads an object from a path to the object storage service. Args: from_path: The path to the file to upload from. to_path: The path to upload the file to. If not provided, will use the file name of from_path; this gets prefixed with the bucket_folder. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_filename`. Returns: The path that the object was uploaded to. Examples: Upload notes.txt to my_folder/notes.txt. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.upload_from_path("notes.txt", "my_folder/notes.txt") ``` """ifto_pathisNone:to_path=Path(from_path).namebucket_path=self._join_bucket_folder(to_path)bucket=awaitself.get_bucket()blob=bucket.blob(bucket_path)self.logger.info(f"Uploading from {from_path!r} to the bucket "f"{self.bucket!r} path {bucket_path!r}.")awaitrun_sync_in_worker_thread(blob.upload_from_filename,filename=from_path,**upload_kwargs)returnbucket_path@sync_compatibleasyncdefupload_from_file_object(self,from_file_object:BinaryIO,to_path:str,**upload_kwargs)->str:""" Uploads an object to the object storage service from a file-like object, which can be a BytesIO object or a BufferedReader. Args: from_file_object: The file-like object to upload from. to_path: The path to upload the object to; this gets prefixed with the bucket_folder. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_file`. Returns: The path that the object was uploaded to. Examples: Upload my_folder/notes.txt object to a BytesIO object. ```python from io import BytesIO from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "rb") as f: gcs_bucket.upload_from_file_object(f, "my_folder/notes.txt") ``` Upload BufferedReader object to my_folder/notes.txt. ```python from io import BufferedReader from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "rb") as f: gcs_bucket.upload_from_file_object( BufferedReader(f), "my_folder/notes.txt" ) ``` """bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(to_path)blob=bucket.blob(bucket_path)self.logger.info(f"Uploading from file object to the bucket "f"{self.bucket!r} path {bucket_path!r}.")awaitrun_sync_in_worker_thread(blob.upload_from_file,from_file_object,**upload_kwargs)returnbucket_path@sync_compatibleasyncdefupload_from_folder(self,from_folder:Union[str,Path],to_folder:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads files *within* a folder (excluding the folder itself) to the object storage service folder. Args: from_folder: The path to the folder to upload from. to_folder: The path to upload the folder to. If not provided, will default to bucket_folder or the base directory of the bucket. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_filename`. Returns: The path that the folder was uploaded to. Examples: Upload local folder my_folder to the bucket's folder my_folder. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.upload_from_folder("my_folder") ``` """from_folder=Path(from_folder)# join bucket folder expects string for the first input# when it returns None, we need to convert it back to empty string# so relative_to worksbucket_folder=self._join_bucket_folder(to_folderor"")or""num_uploaded=0bucket=awaitself.get_bucket()async_coros=[]forfrom_pathinfrom_folder.rglob("**/*"):iffrom_path.is_dir():continuebucket_path=str(Path(bucket_folder)/from_path.relative_to(from_folder))self.logger.info(f"Uploading from {str(from_path)!r} to the bucket "f"{self.bucket!r} path {bucket_path!r}.")blob=bucket.blob(bucket_path)async_coros.append(run_sync_in_worker_thread(blob.upload_from_filename,filename=from_path,**upload_kwargs))num_uploaded+=1awaitasyncio.gather(*async_coros)ifnum_uploaded==0:self.logger.warning(f"No files were uploaded from {from_folder}.")returnbucket_folder@sync_compatibleasyncdefupload_from_dataframe(self,df:"DataFrame",to_path:str,serialization_format:Union[str,DataFrameSerializationFormat]=DataFrameSerializationFormat.CSV_GZIP,**upload_kwargs:Dict[str,Any],)->str:"""Upload a Pandas DataFrame to Google Cloud Storage in various formats. This function uploads the data in a Pandas DataFrame to Google Cloud Storage in a specified format, such as .csv, .csv.gz, .parquet, .parquet.snappy, and .parquet.gz. Args: df: The Pandas DataFrame to be uploaded. to_path: The destination path for the uploaded DataFrame. serialization_format: The format to serialize the DataFrame into. When passed as a `str`, the valid options are: 'csv', 'csv_gzip', 'parquet', 'parquet_snappy', 'parquet_gzip'. Defaults to `DataFrameSerializationFormat.CSV_GZIP`. **upload_kwargs: Additional keyword arguments to pass to the underlying `Blob.upload_from_dataframe` method. Returns: The path that the object was uploaded to. """ifisinstance(serialization_format,str):serialization_format=DataFrameSerializationFormat[serialization_format.upper()]withBytesIO()asbytes_buffer:ifserialization_format.format=="parquet":df.to_parquet(path=bytes_buffer,compression=serialization_format.compression,index=False,)elifserialization_format.format=="csv":df.to_csv(path_or_buf=bytes_buffer,compression=serialization_format.compression,index=False,)bytes_buffer.seek(0)to_path=serialization_format.fix_extension_with(gcs_blob_path=to_path)returnawaitself.upload_from_file_object(from_file_object=bytes_buffer,to_path=to_path,**{"content_type":serialization_format.content_type,**upload_kwargs},)
@sync_compatibleasyncdefcreate_bucket(self,location:Optional[str]=None,**create_kwargs)->"Bucket":""" Creates a bucket. Args: location: The location of the bucket. **create_kwargs: Additional keyword arguments to pass to the `create_bucket` method. Returns: The bucket object. Examples: Create a bucket. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket(bucket="my-bucket") gcs_bucket.create_bucket() ``` """self.logger.info(f"Creating bucket {self.bucket!r}.")client=self.gcp_credentials.get_cloud_storage_client()bucket=awaitrun_sync_in_worker_thread(client.create_bucket,self.bucket,location=location,**create_kwargs)returnbucket
@sync_compatibleasyncdefdownload_folder_to_path(self,from_folder:str,to_folder:Optional[Union[str,Path]]=None,**download_kwargs:Dict[str,Any],)->Path:""" Downloads objects *within* a folder (excluding the folder itself) from the object storage service to a folder. Args: from_folder: The path to the folder to download from; this gets prefixed with the bucket_folder. to_folder: The path to download the folder to. If not provided, will default to the current directory. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_filename`. Returns: The absolute path that the folder was downloaded to. Examples: Download my_folder to a local folder named my_folder. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.download_folder_to_path("my_folder", "my_folder") ``` """ifto_folderisNone:to_folder=""to_folder=Path(to_folder).absolute()blobs=awaitself.list_blobs(folder=from_folder)iflen(blobs)==0:self.logger.warning(f"No blobs were downloaded from "f"bucket {self.bucket!r} path {from_folder!r}.")returnto_folder# do not call self._join_bucket_folder for list_blobs# because it's built-in to that method already!# however, we still need to do it because we're using relative_tobucket_folder=self._join_bucket_folder(from_folder)async_coros=[]forblobinblobs:bucket_path=PurePosixPath(blob.name).relative_to(bucket_folder)ifstr(bucket_path).endswith("/"):continueto_path=to_folder/bucket_pathto_path.parent.mkdir(parents=True,exist_ok=True)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path "f"{str(bucket_path)!r} to {to_path}.")async_coros.append(run_sync_in_worker_thread(blob.download_to_filename,filename=str(to_path),**download_kwargs))awaitasyncio.gather(*async_coros)returnto_folder
@sync_compatibleasyncdefdownload_object_to_file_object(self,from_path:str,to_file_object:BinaryIO,**download_kwargs:Dict[str,Any],)->BinaryIO:""" Downloads an object from the object storage service to a file-like object, which can be a BytesIO object or a BufferedWriter. Args: from_path: The path to the blob to download from; this gets prefixed with the bucket_folder. to_file_object: The file-like object to download the blob to. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_file`. Returns: The file-like object that the object was downloaded to. Examples: Download my_folder/notes.txt object to a BytesIO object. ```python from io import BytesIO from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with BytesIO() as buf: gcs_bucket.download_object_to_file_object("my_folder/notes.txt", buf) ``` Download my_folder/notes.txt object to a BufferedWriter. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "wb") as f: gcs_bucket.download_object_to_file_object("my_folder/notes.txt", f) ``` """bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(from_path)blob=bucket.blob(bucket_path)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path {bucket_path!r}"f"to file object.")awaitrun_sync_in_worker_thread(blob.download_to_file,file_obj=to_file_object,**download_kwargs)returnto_file_object
@sync_compatibleasyncdefdownload_object_to_path(self,from_path:str,to_path:Optional[Union[str,Path]]=None,**download_kwargs:Dict[str,Any],)->Path:""" Downloads an object from the object storage service to a path. Args: from_path: The path to the blob to download; this gets prefixed with the bucket_folder. to_path: The path to download the blob to. If not provided, the blob's name will be used. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_filename`. Returns: The absolute path that the object was downloaded to. Examples: Download my_folder/notes.txt object to notes.txt. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt") ``` """ifto_pathisNone:to_path=Path(from_path).name# making path absolute, but converting back to str here# since !r looks nicer that way and filename arg expects strto_path=str(Path(to_path).absolute())bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(from_path)blob=bucket.blob(bucket_path)self.logger.info(f"Downloading blob from bucket {self.bucket!r} path {bucket_path!r}"f"to {to_path!r}.")awaitrun_sync_in_worker_thread(blob.download_to_filename,filename=to_path,**download_kwargs)returnPath(to_path)
Copies a folder from the configured GCS bucket to a local directory.
Defaults to copying the entire contents of the block's bucket_folder
to the current working directory.
Parameters:
Name
Type
Description
Default
from_path
Optional[str]
Path in GCS bucket to download from. Defaults to the block's
configured bucket_folder.
None
local_path
Optional[str]
Local path to download GCS bucket contents to.
Defaults to the current working directory.
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->List[Union[str,Path]]:""" Copies a folder from the configured GCS bucket to a local directory. Defaults to copying the entire contents of the block's bucket_folder to the current working directory. Args: from_path: Path in GCS bucket to download from. Defaults to the block's configured bucket_folder. local_path: Local path to download GCS bucket contents to. Defaults to the current working directory. Returns: A list of downloaded file paths. """from_path=(self.bucket_folderiffrom_pathisNoneelseself._resolve_path(from_path))iflocal_pathisNone:local_path=os.path.abspath(".")else:local_path=os.path.abspath(os.path.expanduser(local_path))project=self.gcp_credentials.projectclient=self.gcp_credentials.get_cloud_storage_client(project=project)blobs=awaitrun_sync_in_worker_thread(client.list_blobs,self.bucket,prefix=from_path)file_paths=[]forblobinblobs:blob_path=blob.nameifblob_path[-1]=="/":# object is a folder and will be created if it contains any objectscontinuelocal_file_path=os.path.join(local_path,blob_path)os.makedirs(os.path.dirname(local_file_path),exist_ok=True)withdisable_run_logger():file_path=awaitcloud_storage_download_blob_to_file.fn(bucket=self.bucket,blob=blob_path,path=local_file_path,gcp_credentials=self.gcp_credentials,)file_paths.append(file_path)returnfile_paths
@sync_compatibleasyncdeflist_blobs(self,folder:str="")->List["Blob"]:""" Lists all blobs in the bucket that are in a folder. Folders are not included in the output. Args: folder: The folder to list blobs from. Returns: A list of Blob objects. Examples: Get all blobs from a folder named "prefect". ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_blobs("prefect") ``` """client=self.gcp_credentials.get_cloud_storage_client()bucket_path=self._join_bucket_folder(folder)ifbucket_pathisNone:self.logger.info(f"Listing blobs in bucket {self.bucket!r}.")else:self.logger.info(f"Listing blobs in folder {bucket_path!r} in bucket {self.bucket!r}.")blobs=awaitrun_sync_in_worker_thread(client.list_blobs,self.bucket,prefix=bucket_path)# Ignore foldersreturn[blobforblobinblobsifnotblob.name.endswith("/")]
@sync_compatibleasyncdeflist_folders(self,folder:str="")->List[str]:""" Lists all folders and subfolders in the bucket. Args: folder: List all folders and subfolders inside given folder. Returns: A list of folders. Examples: Get all folders from a bucket named "my-bucket". ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_folders() ``` Get all folders from a folder called years ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.list_folders("years") ``` """# Beware of calling _join_bucket_folder twice, see note in method.# However, we just want to use it to check if we are listing the root folderbucket_path=self._join_bucket_folder(folder)ifbucket_pathisNone:self.logger.info(f"Listing folders in bucket {self.bucket!r}.")else:self.logger.info(f"Listing folders in {bucket_path!r} in bucket {self.bucket!r}.")blobs=awaitself.list_blobs(folder)# gets all folders with full pathfolders={str(PurePosixPath(blob.name).parent)forblobinblobs}return[folderforfolderinfoldersiffolder!="."]
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to the configured GCS bucket in a given folder. Defaults to uploading the entire contents the current working directory to the block's bucket_folder. Args: local_path: Path to local directory to upload from. to_path: Path in GCS bucket to upload to. Defaults to block's configured bucket_folder. ignore_file: Path to file containing gitignore style expressions for filepaths to ignore. Returns: The number of files uploaded. """iflocal_pathisNone:local_path=os.path.abspath(".")else:local_path=os.path.expanduser(local_path)to_path=self.bucket_folderifto_pathisNoneelseself._resolve_path(to_path)included_files=Noneifignore_file:withopen(ignore_file,"r")asf:ignore_patterns=f.readlines()included_files=filter_files(local_path,ignore_patterns)uploaded_file_count=0forlocal_file_pathinPath(local_path).rglob("*"):if(included_filesisnotNoneandstr(local_file_path.relative_to(local_path))notinincluded_files):continueelifnotlocal_file_path.is_dir():remote_file_path=str(PurePosixPath(to_path,local_file_path.relative_to(local_path)))local_file_content=local_file_path.read_bytes()awaitself.write_path(remote_file_path,content=local_file_content)uploaded_file_count+=1returnuploaded_file_count
@sync_compatibleasyncdefread_path(self,path:str)->bytes:""" Read specified path from GCS and return contents. Provide the entire path to the key in GCS. Args: path: Entire path to (and including) the key. Returns: A bytes or string representation of the blob object. """path=self._resolve_path(path)withdisable_run_logger():contents=awaitcloud_storage_download_blob_as_bytes.fn(bucket=self.bucket,blob=path,gcp_credentials=self.gcp_credentials)returncontents
Upload a Pandas DataFrame to Google Cloud Storage in various formats.
This function uploads the data in a Pandas DataFrame to Google Cloud Storage
in a specified format, such as .csv, .csv.gz, .parquet,
.parquet.snappy, and .parquet.gz.
The format to serialize the DataFrame into.
When passed as a str, the valid options are:
'csv', 'csv_gzip', 'parquet', 'parquet_snappy', 'parquet_gzip'.
Defaults to DataFrameSerializationFormat.CSV_GZIP.
CSV_GZIP
**upload_kwargs
Dict[str, Any]
Additional keyword arguments to pass to the underlying
@sync_compatibleasyncdefupload_from_dataframe(self,df:"DataFrame",to_path:str,serialization_format:Union[str,DataFrameSerializationFormat]=DataFrameSerializationFormat.CSV_GZIP,**upload_kwargs:Dict[str,Any],)->str:"""Upload a Pandas DataFrame to Google Cloud Storage in various formats. This function uploads the data in a Pandas DataFrame to Google Cloud Storage in a specified format, such as .csv, .csv.gz, .parquet, .parquet.snappy, and .parquet.gz. Args: df: The Pandas DataFrame to be uploaded. to_path: The destination path for the uploaded DataFrame. serialization_format: The format to serialize the DataFrame into. When passed as a `str`, the valid options are: 'csv', 'csv_gzip', 'parquet', 'parquet_snappy', 'parquet_gzip'. Defaults to `DataFrameSerializationFormat.CSV_GZIP`. **upload_kwargs: Additional keyword arguments to pass to the underlying `Blob.upload_from_dataframe` method. Returns: The path that the object was uploaded to. """ifisinstance(serialization_format,str):serialization_format=DataFrameSerializationFormat[serialization_format.upper()]withBytesIO()asbytes_buffer:ifserialization_format.format=="parquet":df.to_parquet(path=bytes_buffer,compression=serialization_format.compression,index=False,)elifserialization_format.format=="csv":df.to_csv(path_or_buf=bytes_buffer,compression=serialization_format.compression,index=False,)bytes_buffer.seek(0)to_path=serialization_format.fix_extension_with(gcs_blob_path=to_path)returnawaitself.upload_from_file_object(from_file_object=bytes_buffer,to_path=to_path,**{"content_type":serialization_format.content_type,**upload_kwargs},)
@sync_compatibleasyncdefupload_from_file_object(self,from_file_object:BinaryIO,to_path:str,**upload_kwargs)->str:""" Uploads an object to the object storage service from a file-like object, which can be a BytesIO object or a BufferedReader. Args: from_file_object: The file-like object to upload from. to_path: The path to upload the object to; this gets prefixed with the bucket_folder. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_file`. Returns: The path that the object was uploaded to. Examples: Upload my_folder/notes.txt object to a BytesIO object. ```python from io import BytesIO from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "rb") as f: gcs_bucket.upload_from_file_object(f, "my_folder/notes.txt") ``` Upload BufferedReader object to my_folder/notes.txt. ```python from io import BufferedReader from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") with open("notes.txt", "rb") as f: gcs_bucket.upload_from_file_object( BufferedReader(f), "my_folder/notes.txt" ) ``` """bucket=awaitself.get_bucket()bucket_path=self._join_bucket_folder(to_path)blob=bucket.blob(bucket_path)self.logger.info(f"Uploading from file object to the bucket "f"{self.bucket!r} path {bucket_path!r}.")awaitrun_sync_in_worker_thread(blob.upload_from_file,from_file_object,**upload_kwargs)returnbucket_path
@sync_compatibleasyncdefupload_from_folder(self,from_folder:Union[str,Path],to_folder:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads files *within* a folder (excluding the folder itself) to the object storage service folder. Args: from_folder: The path to the folder to upload from. to_folder: The path to upload the folder to. If not provided, will default to bucket_folder or the base directory of the bucket. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_filename`. Returns: The path that the folder was uploaded to. Examples: Upload local folder my_folder to the bucket's folder my_folder. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.upload_from_folder("my_folder") ``` """from_folder=Path(from_folder)# join bucket folder expects string for the first input# when it returns None, we need to convert it back to empty string# so relative_to worksbucket_folder=self._join_bucket_folder(to_folderor"")or""num_uploaded=0bucket=awaitself.get_bucket()async_coros=[]forfrom_pathinfrom_folder.rglob("**/*"):iffrom_path.is_dir():continuebucket_path=str(Path(bucket_folder)/from_path.relative_to(from_folder))self.logger.info(f"Uploading from {str(from_path)!r} to the bucket "f"{self.bucket!r} path {bucket_path!r}.")blob=bucket.blob(bucket_path)async_coros.append(run_sync_in_worker_thread(blob.upload_from_filename,filename=from_path,**upload_kwargs))num_uploaded+=1awaitasyncio.gather(*async_coros)ifnum_uploaded==0:self.logger.warning(f"No files were uploaded from {from_folder}.")returnbucket_folder
@sync_compatibleasyncdefupload_from_path(self,from_path:Union[str,Path],to_path:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads an object from a path to the object storage service. Args: from_path: The path to the file to upload from. to_path: The path to upload the file to. If not provided, will use the file name of from_path; this gets prefixed with the bucket_folder. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_filename`. Returns: The path that the object was uploaded to. Examples: Upload notes.txt to my_folder/notes.txt. ```python from prefect_gcp.cloud_storage import GcsBucket gcs_bucket = GcsBucket.load("my-bucket") gcs_bucket.upload_from_path("notes.txt", "my_folder/notes.txt") ``` """ifto_pathisNone:to_path=Path(from_path).namebucket_path=self._join_bucket_folder(to_path)bucket=awaitself.get_bucket()blob=bucket.blob(bucket_path)self.logger.info(f"Uploading from {from_path!r} to the bucket "f"{self.bucket!r} path {bucket_path!r}.")awaitrun_sync_in_worker_thread(blob.upload_from_filename,filename=from_path,**upload_kwargs)returnbucket_path
@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:""" Writes to an GCS bucket. Args: path: The key name. Each object in your bucket has a unique key (or key name). content: What you are uploading to GCS Bucket. Returns: The path that the contents were written to. """path=self._resolve_path(path)withdisable_run_logger():awaitcloud_storage_upload_blob_from_string.fn(data=content,bucket=self.bucket,blob=path,gcp_credentials=self.gcp_credentials,)returnpath
@taskasyncdefcloud_storage_copy_blob(source_bucket:str,dest_bucket:str,source_blob:str,gcp_credentials:GcpCredentials,dest_blob:Optional[str]=None,timeout:Union[float,Tuple[float,float]]=60,project:Optional[str]=None,**copy_kwargs:Dict[str,Any],)->str:""" Copies data from one Google Cloud Storage bucket to another, without downloading it locally. Args: source_bucket: Source bucket name. dest_bucket: Destination bucket name. source_blob: Source blob name. gcp_credentials: Credentials to use for authentication with GCP. dest_blob: Destination blob name; if not provided, defaults to source_blob. timeout: The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). project: Name of the project to use; overrides the gcp_credentials project if provided. **copy_kwargs: Additional keyword arguments to pass to `Bucket.copy_blob`. Returns: Destination blob name. Example: Copies blob from one bucket to another. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_copy_blob @flow() def example_cloud_storage_copy_blob_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") blob = cloud_storage_copy_blob( "source_bucket", "dest_bucket", "source_blob", gcp_credentials ) return blob example_cloud_storage_copy_blob_flow() ``` """logger=get_run_logger()logger.info("Copying blob named %s from the %s bucket to the %s bucket",source_blob,source_bucket,dest_bucket,)source_bucket_obj=await_get_bucket(source_bucket,gcp_credentials,project=project)dest_bucket_obj=await_get_bucket(dest_bucket,gcp_credentials,project=project)ifdest_blobisNone:dest_blob=source_blobsource_blob_obj=source_bucket_obj.blob(source_blob)awaitrun_sync_in_worker_thread(source_bucket_obj.copy_blob,blob=source_blob_obj,destination_bucket=dest_bucket_obj,new_name=dest_blob,timeout=timeout,**copy_kwargs,)returndest_blob
@taskasyncdefcloud_storage_create_bucket(bucket:str,gcp_credentials:GcpCredentials,project:Optional[str]=None,location:Optional[str]=None,**create_kwargs:Dict[str,Any],)->str:""" Creates a bucket. Args: bucket: Name of the bucket. gcp_credentials: Credentials to use for authentication with GCP. project: Name of the project to use; overrides the gcp_credentials project if provided. location: Location of the bucket. **create_kwargs: Additional keyword arguments to pass to `client.create_bucket`. Returns: The bucket name. Example: Creates a bucket named "prefect". ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_create_bucket @flow() def example_cloud_storage_create_bucket_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") bucket = cloud_storage_create_bucket("prefect", gcp_credentials) example_cloud_storage_create_bucket_flow() ``` """logger=get_run_logger()logger.info("Creating %s bucket",bucket)client=gcp_credentials.get_cloud_storage_client(project=project)awaitrun_sync_in_worker_thread(client.create_bucket,bucket,location=location,**create_kwargs)returnbucket
@taskasyncdefcloud_storage_download_blob_as_bytes(bucket:str,blob:str,gcp_credentials:GcpCredentials,chunk_size:Optional[int]=None,encryption_key:Optional[str]=None,timeout:Union[float,Tuple[float,float]]=60,project:Optional[str]=None,**download_kwargs:Dict[str,Any],)->bytes:""" Downloads a blob as bytes. Args: bucket: Name of the bucket. blob: Name of the Cloud Storage blob. gcp_credentials: Credentials to use for authentication with GCP. chunk_size (int, optional): The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. encryption_key: An encryption key. timeout: The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). project: Name of the project to use; overrides the gcp_credentials project if provided. **download_kwargs: Additional keyword arguments to pass to `Blob.download_as_bytes`. Returns: A bytes or string representation of the blob object. Example: Downloads blob from bucket. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes @flow() def example_cloud_storage_download_blob_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") contents = cloud_storage_download_blob_as_bytes( "bucket", "blob", gcp_credentials) return contents example_cloud_storage_download_blob_flow() ``` """logger=get_run_logger()logger.info("Downloading blob named %s from the %s bucket",blob,bucket)bucket_obj=await_get_bucket(bucket,gcp_credentials,project=project)blob_obj=bucket_obj.blob(blob,chunk_size=chunk_size,encryption_key=encryption_key)contents=awaitrun_sync_in_worker_thread(blob_obj.download_as_bytes,timeout=timeout,**download_kwargs)returncontents
@taskasyncdefcloud_storage_download_blob_to_file(bucket:str,blob:str,path:Union[str,Path],gcp_credentials:GcpCredentials,chunk_size:Optional[int]=None,encryption_key:Optional[str]=None,timeout:Union[float,Tuple[float,float]]=60,project:Optional[str]=None,**download_kwargs:Dict[str,Any],)->Union[str,Path]:""" Downloads a blob to a file path. Args: bucket: Name of the bucket. blob: Name of the Cloud Storage blob. path: Downloads the contents to the provided file path; if the path is a directory, automatically joins the blob name. gcp_credentials: Credentials to use for authentication with GCP. chunk_size (int, optional): The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. encryption_key: An encryption key. timeout: The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). project: Name of the project to use; overrides the gcp_credentials project if provided. **download_kwargs: Additional keyword arguments to pass to `Blob.download_to_filename`. Returns: The path to the blob object. Example: Downloads blob from bucket. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_download_blob_to_file @flow() def example_cloud_storage_download_blob_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") path = cloud_storage_download_blob_to_file( "bucket", "blob", "file_path", gcp_credentials) return path example_cloud_storage_download_blob_flow() ``` """logger=get_run_logger()logger.info("Downloading blob named %s from the %s bucket to %s",blob,bucket,path)bucket_obj=await_get_bucket(bucket,gcp_credentials,project=project)blob_obj=bucket_obj.blob(blob,chunk_size=chunk_size,encryption_key=encryption_key)ifos.path.isdir(path):ifisinstance(path,Path):path=path.joinpath(blob)# keep as Path if Path is passedelse:path=os.path.join(path,blob)# keep as str if a str is passedawaitrun_sync_in_worker_thread(blob_obj.download_to_filename,path,timeout=timeout,**download_kwargs)returnpath
Uploads a blob from file path or file-like object. Usage for passing in
file-like object is if the data was downloaded from the web;
can bypass writing to disk and directly upload to Cloud Storage.
@taskasyncdefcloud_storage_upload_blob_from_file(file:Union[str,Path,BytesIO],bucket:str,blob:str,gcp_credentials:GcpCredentials,content_type:Optional[str]=None,chunk_size:Optional[int]=None,encryption_key:Optional[str]=None,timeout:Union[float,Tuple[float,float]]=60,project:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads a blob from file path or file-like object. Usage for passing in file-like object is if the data was downloaded from the web; can bypass writing to disk and directly upload to Cloud Storage. Args: file: Path to data or file like object to upload. bucket: Name of the bucket. blob: Name of the Cloud Storage blob. gcp_credentials: Credentials to use for authentication with GCP. content_type: Type of content being uploaded. chunk_size: The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. encryption_key: An encryption key. timeout: The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). project: Name of the project to use; overrides the gcp_credentials project if provided. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_file` or `Blob.upload_from_filename`. Returns: The blob name. Example: Uploads blob to bucket. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file @flow() def example_cloud_storage_upload_blob_from_file_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") blob = cloud_storage_upload_blob_from_file( "/path/somewhere", "bucket", "blob", gcp_credentials) return blob example_cloud_storage_upload_blob_from_file_flow() ``` """logger=get_run_logger()logger.info("Uploading blob named %s to the %s bucket",blob,bucket)bucket_obj=await_get_bucket(bucket,gcp_credentials,project=project)blob_obj=bucket_obj.blob(blob,chunk_size=chunk_size,encryption_key=encryption_key)ifisinstance(file,BytesIO):awaitrun_sync_in_worker_thread(blob_obj.upload_from_file,file,content_type=content_type,timeout=timeout,**upload_kwargs,)else:awaitrun_sync_in_worker_thread(blob_obj.upload_from_filename,file,content_type=content_type,timeout=timeout,**upload_kwargs,)returnblob
@taskasyncdefcloud_storage_upload_blob_from_string(data:Union[str,bytes],bucket:str,blob:str,gcp_credentials:GcpCredentials,content_type:Optional[str]=None,chunk_size:Optional[int]=None,encryption_key:Optional[str]=None,timeout:Union[float,Tuple[float,float]]=60,project:Optional[str]=None,**upload_kwargs:Dict[str,Any],)->str:""" Uploads a blob from a string or bytes representation of data. Args: data: String or bytes representation of data to upload. bucket: Name of the bucket. blob: Name of the Cloud Storage blob. gcp_credentials: Credentials to use for authentication with GCP. content_type: Type of content being uploaded. chunk_size: The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. encryption_key: An encryption key. timeout: The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). project: Name of the project to use; overrides the gcp_credentials project if provided. **upload_kwargs: Additional keyword arguments to pass to `Blob.upload_from_string`. Returns: The blob name. Example: Uploads blob to bucket. ```python from prefect import flow from prefect_gcp import GcpCredentials from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_string @flow() def example_cloud_storage_upload_blob_from_string_flow(): gcp_credentials = GcpCredentials( service_account_file="/path/to/service/account/keyfile.json") blob = cloud_storage_upload_blob_from_string( "data", "bucket", "blob", gcp_credentials) return blob example_cloud_storage_upload_blob_from_string_flow() ``` """logger=get_run_logger()logger.info("Uploading blob named %s to the %s bucket",blob,bucket)bucket_obj=await_get_bucket(bucket,gcp_credentials,project=project)blob_obj=bucket_obj.blob(blob,chunk_size=chunk_size,encryption_key=encryption_key)awaitrun_sync_in_worker_thread(blob_obj.upload_from_string,data,content_type=content_type,timeout=timeout,**upload_kwargs,)returnblob