summaryrefslogtreecommitdiffstats
path: root/src/arrow/docs/source/python/plasma.rst
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/docs/source/python/plasma.rst')
-rw-r--r--src/arrow/docs/source/python/plasma.rst462
1 files changed, 462 insertions, 0 deletions
diff --git a/src/arrow/docs/source/python/plasma.rst b/src/arrow/docs/source/python/plasma.rst
new file mode 100644
index 000000000..51c7b6eaf
--- /dev/null
+++ b/src/arrow/docs/source/python/plasma.rst
@@ -0,0 +1,462 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. currentmodule:: pyarrow
+.. _plasma:
+
+The Plasma In-Memory Object Store
+=================================
+
+.. note::
+
+ As present, Plasma is only supported for use on Linux and macOS.
+
+The Plasma API
+--------------
+
+Starting the Plasma store
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can start the Plasma store by issuing a terminal command similar to the
+following:
+
+.. code-block:: bash
+
+ plasma_store -m 1000000000 -s /tmp/plasma
+
+The ``-m`` flag specifies the size of the store in bytes, and the ``-s`` flag
+specifies the socket that the store will listen at. Thus, the above command
+allows the Plasma store to use up to 1GB of memory, and sets the socket to
+``/tmp/plasma``.
+
+Leaving the current terminal window open as long as Plasma store should keep
+running. Messages, concerning such as disconnecting clients, may occasionally be
+printed to the screen. To stop running the Plasma store, you can press
+``Ctrl-C`` in the terminal.
+
+Creating a Plasma client
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+To start a Plasma client from Python, call ``plasma.connect`` using the same
+socket name:
+
+.. code-block:: python
+
+ import pyarrow.plasma as plasma
+ client = plasma.connect("/tmp/plasma")
+
+If the following error occurs from running the above Python code, that
+means that either the socket given is incorrect, or the ``./plasma_store`` is
+not currently running. Check to see if the Plasma store is still running.
+
+.. code-block:: shell
+
+ >>> client = plasma.connect("/tmp/plasma")
+ Connection to socket failed for pathname /tmp/plasma
+ Could not connect to socket /tmp/plasma
+
+
+Object IDs
+^^^^^^^^^^
+
+Each object in the Plasma store should be associated with a unique ID. The
+Object ID then serves as a key that any client can use to retrieve that object
+from the Plasma store. You can form an ``ObjectID`` object from a byte string of
+length 20.
+
+.. code-block:: shell
+
+ # Create an ObjectID.
+ >>> id = plasma.ObjectID(20 * b"a")
+
+ # The character "a" is encoded as 61 in hex.
+ >>> id
+ ObjectID(6161616161616161616161616161616161616161)
+
+The random generation of Object IDs is often good enough to ensure unique IDs.
+You can easily create a helper function that randomly generates object IDs as
+follows:
+
+.. code-block:: python
+
+ import numpy as np
+
+ def random_object_id():
+ return plasma.ObjectID(np.random.bytes(20))
+
+Putting and Getting Python Objects
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Plasma supports two APIs for creating and accessing objects: A high level
+API that allows storing and retrieving Python objects and a low level
+API that allows creating, writing and sealing buffers and operating on
+the binary data directly. In this section we describe the high level API.
+
+This is how you can put and get a Python object:
+
+.. code-block:: python
+
+ # Create a python object.
+ object_id = client.put("hello, world")
+
+ # Get the object.
+ client.get(object_id)
+
+This works with all Python objects supported by the Arrow Python object
+serialization.
+
+You can also get multiple objects at the same time (which can be more
+efficient since it avoids IPC round trips):
+
+.. code-block:: python
+
+ # Create multiple python objects.
+ object_id1 = client.put(1)
+ object_id2 = client.put(2)
+ object_id3 = client.put(3)
+
+ # Get the objects.
+ client.get([object_id1, object_id2, object_id3])
+
+Furthermore, it is possible to provide a timeout for the get call. If the
+object is not available within the timeout, the special object
+`pyarrow.ObjectNotAvailable` will be returned.
+
+Creating an Object Buffer
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Objects are created in Plasma in two stages. First, they are **created**, which
+allocates a buffer for the object. At this point, the client can write to the
+buffer and construct the object within the allocated buffer.
+
+To create an object for Plasma, you need to create an object ID, as well as
+give the object's maximum size in bytes.
+
+.. code-block:: python
+
+ # Create an object buffer.
+ object_id = plasma.ObjectID(20 * b"a")
+ object_size = 1000
+ buffer = memoryview(client.create(object_id, object_size))
+
+ # Write to the buffer.
+ for i in range(1000):
+ buffer[i] = i % 128
+
+When the client is done, the client **seals** the buffer, making the object
+immutable, and making it available to other Plasma clients.
+
+.. code-block:: python
+
+ # Seal the object. This makes the object immutable and available to other clients.
+ client.seal(object_id)
+
+
+Getting an Object Buffer
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+After an object has been sealed, any client who knows the object ID can get
+the object buffer.
+
+.. code-block:: python
+
+ # Create a different client. Note that this second client could be
+ # created in the same or in a separate, concurrent Python session.
+ client2 = plasma.connect("/tmp/plasma")
+
+ # Get the object in the second client. This blocks until the object has been sealed.
+ object_id2 = plasma.ObjectID(20 * b"a")
+ [buffer2] = client2.get_buffers([object_id])
+
+If the object has not been sealed yet, then the call to client.get_buffers will
+block until the object has been sealed by the client constructing the object.
+Using the ``timeout_ms`` argument to get, you can specify a timeout for this (in
+milliseconds). After the timeout, the interpreter will yield control back.
+
+.. code-block:: shell
+
+ >>> buffer
+ <memory at 0x7fdbdc96e708>
+ >>> buffer[1]
+ 1
+ >>> buffer2
+ <plasma.plasma.PlasmaBuffer object at 0x7fdbf2770e88>
+ >>> view2 = memoryview(buffer2)
+ >>> view2[1]
+ 1
+ >>> view2[129]
+ 1
+ >>> bytes(buffer[1:4])
+ b'\x01\x02\x03'
+ >>> bytes(view2[1:4])
+ b'\x01\x02\x03'
+
+
+Listing objects in the store
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The objects in the store can be listed in the following way (note that
+this functionality is currently experimental and the concrete representation
+of the object info might change in the future):
+
+.. code-block:: python
+
+ import pyarrow.plasma as plasma
+ import time
+
+ client = plasma.connect("/tmp/plasma")
+
+ client.put("hello, world")
+ # Sleep a little so we get different creation times
+ time.sleep(2)
+ client.put("another object")
+ # Create an object that is not sealed yet
+ object_id = plasma.ObjectID.from_random()
+ client.create(object_id, 100)
+ print(client.list())
+
+ >>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
+ >>> 'create_time': 1535223642,
+ >>> 'data_size': 460,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 0,
+ >>> 'state': 'sealed'},
+ >>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
+ >>> 'create_time': 1535223644,
+ >>> 'data_size': 460,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 0,
+ >>> 'state': 'sealed'},
+ >>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
+ >>> 'create_time': 1535223644,
+ >>> 'data_size': 100,
+ >>> 'metadata_size': 0,
+ >>> 'ref_count': 1,
+ >>> 'state': 'created'}}
+
+
+Using Arrow and Pandas with Plasma
+----------------------------------
+
+Storing Arrow Objects in Plasma
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To store an Arrow object in Plasma, we must first **create** the object and then
+**seal** it. However, Arrow objects such as ``Tensors`` may be more complicated
+to write than simple binary data.
+
+To create the object in Plasma, you still need an ``ObjectID`` and a size to
+pass in. To find out the size of your Arrow object, you can use pyarrow
+API such as ``pyarrow.ipc.get_tensor_size``.
+
+.. code-block:: python
+
+ import numpy as np
+ import pyarrow as pa
+
+ # Create a pyarrow.Tensor object from a numpy random 2-dimensional array
+ data = np.random.randn(10, 4)
+ tensor = pa.Tensor.from_numpy(data)
+
+ # Create the object in Plasma
+ object_id = plasma.ObjectID(np.random.bytes(20))
+ data_size = pa.ipc.get_tensor_size(tensor)
+ buf = client.create(object_id, data_size)
+
+To write the Arrow ``Tensor`` object into the buffer, you can use Plasma to
+convert the ``memoryview`` buffer into a ``pyarrow.FixedSizeBufferWriter``
+object. A ``pyarrow.FixedSizeBufferWriter`` is a format suitable for Arrow's
+``pyarrow.ipc.write_tensor``:
+
+.. code-block:: python
+
+ # Write the tensor into the Plasma-allocated buffer
+ stream = pa.FixedSizeBufferWriter(buf)
+ pa.ipc.write_tensor(tensor, stream) # Writes tensor's 552 bytes to Plasma stream
+
+To finish storing the Arrow object in Plasma, call ``seal``:
+
+.. code-block:: python
+
+ # Seal the Plasma object
+ client.seal(object_id)
+
+Getting Arrow Objects from Plasma
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID.
+
+.. code-block:: python
+
+ # Get the arrow object by ObjectID.
+ [buf2] = client.get_buffers([object_id])
+
+To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a
+pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader``
+into ``pyarrow.ipc.read_tensor`` to reconstruct the Arrow ``Tensor`` object:
+
+.. code-block:: python
+
+ # Reconstruct the Arrow tensor object.
+ reader = pa.BufferReader(buf2)
+ tensor2 = pa.ipc.read_tensor(reader)
+
+Finally, you can use ``pyarrow.ipc.read_tensor`` to convert the Arrow object
+back into numpy data:
+
+.. code-block:: python
+
+ # Convert back to numpy
+ array = tensor2.to_numpy()
+
+Storing Pandas DataFrames in Plasma
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Storing a Pandas ``DataFrame`` still follows the **create** then **seal**
+process of storing an object in the Plasma store, however one cannot directly
+write the ``DataFrame`` to Plasma with Pandas alone. Plasma also needs to know
+the size of the ``DataFrame`` to allocate a buffer for.
+
+See :ref:`pandas_interop` for more information on using Arrow with Pandas.
+
+You can create the pyarrow equivalent of a Pandas ``DataFrame`` by using
+``pyarrow.from_pandas`` to convert it to a ``RecordBatch``.
+
+.. code-block:: python
+
+ import pyarrow as pa
+ import pandas as pd
+
+ # Create a Pandas DataFrame
+ d = {'one' : pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
+ 'two' : pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
+ df = pd.DataFrame(d)
+
+ # Convert the Pandas DataFrame into a PyArrow RecordBatch
+ record_batch = pa.RecordBatch.from_pandas(df)
+
+Creating the Plasma object requires an ``ObjectID`` and the size of the
+data. Now that we have converted the Pandas ``DataFrame`` into a PyArrow
+``RecordBatch``, use the ``MockOutputStream`` to determine the
+size of the Plasma object.
+
+.. code-block:: python
+
+ # Create the Plasma object from the PyArrow RecordBatch. Most of the work here
+ # is done to determine the size of buffer to request from the object store.
+ object_id = plasma.ObjectID(np.random.bytes(20))
+ mock_sink = pa.MockOutputStream()
+ with pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) as stream_writer:
+ stream_writer.write_batch(record_batch)
+ data_size = mock_sink.size()
+ buf = client.create(object_id, data_size)
+
+The DataFrame can now be written to the buffer as follows.
+
+.. code-block:: python
+
+ # Write the PyArrow RecordBatch to Plasma
+ stream = pa.FixedSizeBufferWriter(buf)
+ with pa.RecordBatchStreamWriter(stream, record_batch.schema) as stream_writer:
+ stream_writer.write_batch(record_batch)
+
+Finally, seal the finished object for use by all clients:
+
+.. code-block:: python
+
+ # Seal the Plasma object
+ client.seal(object_id)
+
+Getting Pandas DataFrames from Plasma
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object,
+to get the object back from the Plasma store, we follow similar steps
+to those specified in `Getting Arrow Objects from Plasma`_.
+
+We first have to convert the ``PlasmaBuffer`` returned from
+``client.get_buffers`` into an Arrow ``BufferReader`` object.
+
+.. code-block:: python
+
+ # Fetch the Plasma object
+ [data] = client.get_buffers([object_id]) # Get PlasmaBuffer from ObjectID
+ buffer = pa.BufferReader(data)
+
+From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader``
+in Arrow to reconstruct the stored PyArrow ``RecordBatch`` object.
+
+.. code-block:: python
+
+ # Convert object back into an Arrow RecordBatch
+ reader = pa.RecordBatchStreamReader(buffer)
+ record_batch = reader.read_next_batch()
+
+The last step is to convert the PyArrow ``RecordBatch`` object back into
+the original Pandas ``DataFrame`` structure.
+
+.. code-block:: python
+
+ # Convert back into Pandas
+ result = record_batch.to_pandas()
+
+Using Plasma with Huge Pages
+----------------------------
+
+On Linux it is possible to use the Plasma store with huge pages for increased
+throughput. You first need to create a file system and activate huge pages with
+
+.. code-block:: shell
+
+ sudo mkdir -p /mnt/hugepages
+ gid=`id -g`
+ uid=`id -u`
+ sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
+ sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
+ sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
+
+Note that you only need root access to create the file system, not for
+running the object store. You can then start the Plasma store with the ``-d``
+flag for the mount point of the huge page file system and the ``-h`` flag
+which indicates that huge pages are activated:
+
+.. code-block:: shell
+
+ plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h
+
+You can test this with the following script:
+
+.. code-block:: python
+
+ import numpy as np
+ import pyarrow as pa
+ import pyarrow.plasma as plasma
+ import time
+
+ client = plasma.connect("/tmp/plasma")
+
+ data = np.random.randn(100000000)
+ tensor = pa.Tensor.from_numpy(data)
+
+ object_id = plasma.ObjectID(np.random.bytes(20))
+ buf = client.create(object_id, pa.ipc.get_tensor_size(tensor))
+
+ stream = pa.FixedSizeBufferWriter(buf)
+ stream.set_memcopy_threads(4)
+ a = time.time()
+ pa.ipc.write_tensor(tensor, stream)
+ print("Writing took ", time.time() - a)