Async¶
fsspec
supports asynchronous operations on certain implementations. This
allows for concurrent calls within bulk operations such as cat
(fetch
the contents of many files at once) even from normal code, and for the direct
use of fsspec in async code without blocking.
Async implementations derive from the class fsspec.async.AsyncFileSystem
.
The class attribute async_impl
can be used to test whether an
implementation is async of not.
AsyncFileSystem
contains async def
coroutine versions of the methods of
AbstractFileSystem
. By convention, these methods are prefixed with “_”
to indicate that they are not to called directly in normal code, only
when you know what you are doing. In most cases, the code is identical or
slightly modified by replacing sync calls with await
calls to async
functions.
The only async implementation built into fsspec
is HTTPFileSystem
.
Synchronous API¶
The methods of AbstractFileSystem
are available and can be called from
normal code. They call and wait on the corresponding async function. The
work is carried out in a separate threads, so if there are many fsspec
operations in flight at once, launched from many threads, they will still
all be processed on the same IO-dedicated thread.
Most users should not be aware that their code is running async.
Note that the sync functions are wrapped using sync_wrapper
, which
copies the docstrings from AbstractFileSystem
, unless they are
explicitly given in the implementation.
Example:
fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3]) # fetches data concurrently
Coroutine batching¶
The various methods which create many coroutines to be passed to the event loop for processing may be batched: submitting a certain number in one go and waiting for them to complete before launching more. This is important to work around local open-file limits (which can be <~100) and not to swamp the heap.
fsspec.asyn._run_coros_in_chunks
controls this process, but from the user’s point
of view, there are three ways to affect it. In increasing order or precedence:
the global variables
fsspec.asyn._DEFAULT_BATCH_SIZE
andfsspec.asyn._NOFILES_DEFAULT_BATCH_SIZE
(for calls involving local files or not, respectively)config keys “gather_batch_size” and “nofiles_gather_batch_size”
the
batch_size
keyword, accepted by the batch methods of an async filesystem.
Using from Async¶
File system instances can be created with asynchronous=True
. This
implies that the instantiation is happening within a coroutine, so
the various async method can be called directly with await
, as is
normal in async code.
Note that, because __init__
is a blocking function, any creation
of asynchronous resources will be deferred. You will normally need to
explicitly await
a coroutine to create them. Since garbage collection
also happens in blocking code, you may wish to explicitly await
resource destructors too. Example:
async def work_coroutine():
fs = fsspec.filesystem("http", asynchronous=True)
session = await fs.set_session() # creates client
out = await fs._cat([url1, url2, url3]) # fetches data concurrently
await session.close() # explicit destructor
asyncio.run(work_coroutine())
Bring your own loop¶
For the non-asynchronous case, fsspec
will normally create an asyncio
event loop on a specific thread. However, the calling application may prefer
IO processes to run on a loop that is already around and running (in another
thread). The loop needs to be asyncio compliant, but does not necessarily need
to be an ayncio.events.AbstractEventLoop
. Example:
loop = ... # however a loop was made, running on another thread
fs = fsspec.filesystem("http", loop=loop)
out = fs.cat([url1, url2, url3]) # fetches data concurrently
Implementing new backends¶
Async file systems should derive from AsyncFileSystem
, and implement the
async def _*
coroutines there. These functions will either have sync versions
automatically generated is the name is in the async_methods
list, or
can be directly created using sync_wrapper
.
class MyFileSystem(AsyncFileSystem):
async def _my_method(self):
...
my_method = sync_wrapper(_my_method)
These functions must not call methods or functions which themselves are synced,
but should instead await
other coroutines. Calling methods which do not require sync,
such as _strip_protocol
is fine.
Note that __init__
, cannot be async
, so it might need to allocate async
resources using the sync
function, but only if asynchronous=False
. If it
is True
, you probably need to require the caller to await a coroutine that
creates those resources. Similarly, any destructor (e.g., __del__
) will run from normal
code, and possibly after the loop has stopped/closed.
To call sync
, you will need to pass the associated event loop, which will be
available as the attribute .loop
.
|
Async file operations, default implementations |
|
Make loop run coroutine until it returns. |
|
Given a function, make so can be called in async or blocking contexts |
Create or return the default fsspec IO loop |
- class fsspec.asyn.AsyncFileSystem(*args, **kwargs)[source]¶
Async file operations, default implementations
Passes bulk operations to asyncio.gather for concurrent operation.
Implementations that have concurrent batch operations and/or async methods should inherit from this class instead of AbstractFileSystem. Docstrings are copied from the un-underscored method in AbstractFileSystem, if not given.
- Attributes
- loop
transaction
A context within which files are committed together upon exit
Methods
cat
(path[, recursive, on_error])Fetch (potentially multiple) paths' contents
cat_file
(path[, start, end])Get the content of a file
checksum
(path)Unique value for current version of file
clear_instance_cache
()Clear the cache of filesystem instances.
copy
(path1, path2[, recursive, on_error])Copy within two locations in the filesystem
cp
(path1, path2, **kwargs)Alias of
AbstractFileSystem.copy
.created
(path)Return the created timestamp of a file as a datetime.datetime
current
()Return the most recently created FileSystem
delete
(path[, recursive, maxdepth])Alias of
AbstractFileSystem.rm
.disk_usage
(path[, total, maxdepth])Alias of
AbstractFileSystem.du
.download
(rpath, lpath[, recursive])Alias of
AbstractFileSystem.get
.du
(path[, total, maxdepth])Space used by files within a path
end_transaction
()Finish write transaction, non-context version
exists
(path, **kwargs)Is there a file at the given path
expand_path
(path[, recursive, maxdepth])Turn one or more globs or directories into a list of all matching paths to files or directories.
find
(path[, maxdepth, withdirs])List all files below path.
from_json
(blob)Recreate a filesystem instance from JSON representation
get
(rpath, lpath[, recursive, callback])Copy file(s) to local.
get_file
(rpath, lpath[, callback])Copy single remote file to local
get_mapper
(root[, check, create])Create key/value store based on this file-system
glob
(path, **kwargs)Find files by glob-matching.
head
(path[, size])Get the first
size
bytes from fileinfo
(path, **kwargs)Give details of entry at path
invalidate_cache
([path])Discard any cached directory information
isdir
(path)Is this entry directory-like?
isfile
(path)Is this entry file-like?
lexists
(path, **kwargs)If there is a file at the given path (including broken links)
listdir
(path[, detail])Alias of
AbstractFileSystem.ls
.ls
(path[, detail])List objects at path.
makedir
(path[, create_parents])Alias of
AbstractFileSystem.mkdir
.makedirs
(path[, exist_ok])Recursively make directories
mkdir
(path[, create_parents])Create directory entry at path
mkdirs
(path[, exist_ok])Alias of
AbstractFileSystem.makedirs
.modified
(path)Return the modified timestamp of a file as a datetime.datetime
move
(path1, path2, **kwargs)Alias of
AbstractFileSystem.mv
.mv
(path1, path2[, recursive, maxdepth])Move file(s) from one location to another
open
(path[, mode, block_size, ...])Return a file-like object from the filesystem
pipe
(path[, value])Put value into path
pipe_file
(path, value, **kwargs)Set the bytes of given file
put
(lpath, rpath[, recursive, callback])Copy file(s) from local.
put_file
(lpath, rpath[, callback])Copy single file to remote
read_block
(fn, offset, length[, delimiter])Read a block of bytes from
rename
(path1, path2, **kwargs)Alias of
AbstractFileSystem.mv
.rm
(path[, recursive, maxdepth])Delete files.
rm_file
(path)Delete a file
rmdir
(path)Remove a directory, if empty
sign
(path[, expiration])Create a signed URL representing the given path
size
(path)Size in bytes of file
sizes
(paths)Size in bytes of each file in a list of paths
start_transaction
()Begin write transaction for deferring files, non-context version
stat
(path, **kwargs)Alias of
AbstractFileSystem.info
.tail
(path[, size])Get the last
size
bytes from fileto_json
()JSON representation of this filesystem instance
touch
(path[, truncate])Create empty file, or update timestamp
ukey
(path)Hash of file properties, to tell if it has changed
upload
(lpath, rpath[, recursive])Alias of
AbstractFileSystem.put
.walk
(path[, maxdepth])Return all files belows path
cat_ranges
cp_file
- fsspec.asyn.sync(loop, func, *args, timeout=None, **kwargs)[source]¶
Make loop run coroutine until it returns. Runs in other thread
- fsspec.asyn.sync_wrapper(func, obj=None)[source]¶
Given a function, make so can be called in async or blocking contexts
Leave obj=None if defining within a class. Pass the instance if attaching as an attribute of the instance.