Bryan Cutler BryanCutler
-
IBM Spark Technology Center
- San Francisco
- Sign in to view email
- http://BryanCutler.github.io
View tf_arrow_blog_p11.py
def make_remote_dataset(endpoint): | |
"""Make a TensorFlow Arrow Dataset that reads from a remote Arrow stream.""" | |
# Create the Arrow Dataset from a remote host serving a stream | |
ds = arrow_io.ArrowStreamDataset( | |
[endpoint], | |
columns=(0, 1, 2), | |
output_types=(tf.int64, tf.float64, tf.float64), | |
batch_mode='auto') |
View tf_arrow_blog_pt10.py
def serve_csv_data(ip_addr, port_num, directory): | |
""" | |
Create a socket and serve Arrow record batches as a stream read from the | |
given directory containing CVS files. | |
""" | |
# Create the socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.bind((ip_addr, port_num)) | |
sock.listen(1) |
View tf_arrow_blog_pt9.py
def read_and_process_dir(directory): | |
"""Read a directory of CSV files and yield processed Arrow batches.""" | |
for f in os.listdir(directory): | |
if f.endswith(".csv"): | |
filename = os.path.join(directory, f) | |
for batch in read_and_process(filename): | |
yield batch |
View tf_arrow_blog_pt8.py
ds = make_local_dataset(filename) | |
model = model_fit(ds) | |
print("Fit model with weights: {}".format(model.get_weights())) | |
# Fit model with weights: | |
# [array([[0.7793554 ], [0.61216295]], dtype=float32), | |
# array([0.03328196], dtype=float32)] |
View tf_arrow_blog_pt7.py
def make_local_dataset(filename): | |
"""Make a TensorFlow Arrow Dataset that reads from a local CSV file.""" | |
# Read the local file and get a record batch iterator | |
batch_iter = read_and_process(filename) | |
# Create the Arrow Dataset as a stream from local iterator of record batches | |
ds = arrow_io.ArrowStreamDataset.from_record_batches( | |
batch_iter, | |
output_types=(tf.int64, tf.float64, tf.float64), |
View tf_arrow_blog_pt6.py
def read_and_process(filename): | |
"""Read the given CSV file and yield processed Arrow batches.""" | |
# Read a CSV file into an Arrow Table with threading enabled and | |
# set block_size in bytes to break the file into chunks for granularity, | |
# which determines the number of batches in the resulting pyarrow.Table | |
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096) | |
table = pyarrow.csv.read_csv(filename, opts) | |
# Fit the feature transform |
View tf_arrow_blog_pt5.py
def model_fit(ds): | |
"""Create and fit a Keras logistic regression model.""" | |
# Build the Keras model | |
model = tf.keras.Sequential() | |
model.add(tf.keras.layers.Dense(1, input_shape=(2,), | |
activation='sigmoid')) | |
model.compile(optimizer='sgd', loss='mean_squared_error', | |
metrics=['accuracy']) |
View tf_arrow_blog_pt4.py
import tensorflow_io.arrow as arrow_io | |
ds = arrow_io.ArrowStreamDataset.from_pandas( | |
df, | |
batch_size=2, | |
preserve_index=False) |
View tf_arrow_blog_pt3.py
import tensorflow_io.arrow as arrow_io | |
from pyarrow.feather import write_feather | |
# Write the Pandas DataFrame to a Feather file | |
write_feather(df, '/path/to/df.feather') | |
# Create the dataset with one or more filenames | |
ds = arrow_io.ArrowFeatherDataset( | |
['/path/to/df.feather'], | |
columns=(0, 1, 2), |
View tf_arrow_blog_pt2.py
import tensorflow_io.arrow as arrow_io | |
ds = arrow_io.ArrowDataset.from_pandas( | |
df, | |
batch_size=2, | |
preserve_index=False) | |
# Make an iterator to the dataset | |
ds_iter = iter(ds) |
NewerOlder