Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Binary type #850

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion embulk-core/src/main/java/org/embulk/spi/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 4,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.embulk.spi.type.Type;
import org.embulk.spi.type.BinaryType;
import org.embulk.spi.type.BooleanType;
import org.embulk.spi.type.DoubleType;
import org.embulk.spi.type.LongType;
Expand Down Expand Up @@ -48,7 49,9 @@ public Type getType()

public void visit(ColumnVisitor visitor)
{
if (type instanceof BooleanType) {
if (type instanceof BinaryType) {
visitor.binaryColumn(this);
} else if (type instanceof BooleanType) {
visitor.booleanColumn(this);
} else if (type instanceof LongType) {
visitor.longColumn(this);
Expand Down
6 changes: 6 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/ColumnVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 2,12 @@

public interface ColumnVisitor
{
void binaryColumn(Column column); // TODO: Add default implementation
/*
default void binaryColumn(Column column) {
}
*/

void booleanColumn(Column column);

void longColumn(Column column);
Expand Down
19 changes: 19 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/Page.java
Original file line number Diff line number Diff line change
@@ -1,11 1,13 @@
package org.embulk.spi;

import java.nio.ByteBuffer;
import java.util.List;
import org.msgpack.value.ImmutableValue;

public class Page
{
private final Buffer buffer;
private List<ByteBuffer> binaryReferences;
private List<String> stringReferences;
private List<ImmutableValue> valueReferences;

Expand All @@ -24,6 26,12 @@ public static Page wrap(Buffer buffer)
return new Page(buffer);
}

public Page setBinaryReferences(List<ByteBuffer> binaryReferences)
{
this.binaryReferences = binaryReferences;
return this;
}

public Page setStringReferences(List<String> values)
{
this.stringReferences = values;
Expand All @@ -36,6 44,12 @@ public Page setValueReferences(List<ImmutableValue> values)
return this;
}

public List<ByteBuffer> getBinaryReferences()
{
// TODO used by mapreduce executor
return binaryReferences;
}

public List<String> getStringReferences()
{
// TODO used by mapreduce executor
Expand All @@ -48,6 62,11 @@ public List<ImmutableValue> getValueReferences()
return valueReferences;
}

public ByteBuffer getBinaryReference(int index)
{
return binaryReferences.get(index).asReadOnlyBuffer();
}

public String getStringReference(int index)
{
return stringReferences.get(index);
Expand Down
50 changes: 50 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/PageBuilder.java
Original file line number Diff line number Diff line change
@@ -1,6 1,7 @@
package org.embulk.spi;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.List;
import java.util.Arrays;
Expand Down Expand Up @@ -30,6 31,7 @@ public class PageBuilder
private int count;
private int position;
private final byte[] nullBitSet;
private final BiMap<ByteBuffer, Integer> binaryReferences = HashBiMap.create();
private final BiMap<String, Integer> stringReferences = HashBiMap.create();
private List<ImmutableValue> valueReferences = new ArrayList<>();
private int referenceSize;
Expand All @@ -54,6 56,7 @@ private void newBuffer()
this.bufferSlice = Slices.wrappedBuffer(buffer.array(), buffer.offset(), buffer.capacity());
this.count = 0;
this.position = PageFormat.PAGE_HEADER_SIZE;
this.binaryReferences.clear();
this.stringReferences.clear();
this.valueReferences = new ArrayList<>();
this.referenceSize = 0;
Expand All @@ -79,6 82,25 @@ private void clearNull(int columnIndex)
nullBitSet[columnIndex >>> 3] &= ~(1 << (columnIndex & 7));
}

public void setBinary(Column column, ByteBuffer value)
{
setBinary(column.getIndex(), value);
}

public void setBinary(int columnIndex, ByteBuffer value)
{
Integer reuseIndex = binaryReferences.get(value.asReadOnlyBuffer());
if (reuseIndex != null) {
bufferSlice.setInt(getOffset(columnIndex), reuseIndex);
} else {
int index = binaryReferences.size();
binaryReferences.put(value, index);
bufferSlice.setInt(getOffset(columnIndex), index);
referenceSize = value.length * 2 4; // assuming size of char = size of byte * 2 length
}
clearNull(columnIndex);
}

public void setBoolean(Column column, boolean value)
{
// TODO check type?
Expand Down Expand Up @@ -169,6 191,33 @@ private int getOffset(int columnIndex)
return position columnOffsets[columnIndex];
}

private static class BinaryReferenceSortComparator
implements Comparator<Map.Entry<ByteBuffer, Integer>>, Serializable
{
@Override
public int compare(Map.Entry<ByteBuffer, Integer> e1, Map.Entry<ByteBuffer, Integer> e2)
{
return e1.getValue().compareTo(e2.getValue());
}

@Override
public boolean equals(Object obj)
{
return obj instanceof BinaryReferenceSortComparator;
}
}

private List<byte[]> getSortedBinaryReferences()
{
ArrayList<Map.Entry<ByteBuffer, Integer>> s = new ArrayList<>(binaryReferences.entrySet());
Collections.sort(s, new BinaryReferenceSortComparator());
ByteBuffer[] array = new ByteBuffer[s.size()];
for (int i=0; i < array.length; i ) {
array[i] = s.get(i).getKey();
}
return Arrays.asList(array);
}

private static class StringReferenceSortComparator
implements Comparator<Map.Entry<String, Integer>>, Serializable
{
Expand Down Expand Up @@ -222,6 271,7 @@ private void doFlush()

// flush page
Page page = Page.wrap(buffer)
.setBinaryReferences(getSortedBinaryReferences())
.setStringReferences(getSortedStringReferences())
.setValueReferences(valueReferences);
buffer = null;
Expand Down
11 changes: 11 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/PageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 66,17 @@ public boolean isNull(int columnIndex)
return (nullBitSet[columnIndex >>> 3] & (1 << (columnIndex & 7))) != 0;
}

public ByteBuffer getBinary(Column column)
{
return getBinary(column.getIndex());
}

public ByteBuffer getBinary(int columnIndex)
{
int index = pageSlice.getInt(getOffset(columnIndex));
return page.getBinaryReference(index).asReadOnlyBuffer();
}

public boolean getBoolean(Column column)
{
// TODO check type?
Expand Down
14 changes: 14 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/type/BinaryType.java
Original file line number Diff line number Diff line change
@@ -0,0 1,14 @@
package org.embulk.spi.type;

import java.nio.ByteBuffer;

public class BinaryType
extends AbstractType
{
static final BinaryType BINARY = new BinaryType();

private BinaryType()
{
super("binary", ByteBuffer.class, 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 15,7 @@ public class TypeDeserializer

static {
ImmutableMap.Builder<String, Type> builder = ImmutableMap.builder();
builder.put(BinaryType.BINARY.getName(), BinaryType.BINARY);
builder.put(BooleanType.BOOLEAN.getName(), BooleanType.BOOLEAN);
builder.put(LongType.LONG.getName(), LongType.LONG);
builder.put(DoubleType.DOUBLE.getName(), DoubleType.DOUBLE);
Expand Down
2 changes: 2 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/type/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 2,8 @@

public class Types
{
public static final BinaryType BINARY = BinaryType.BINARY;

public static final BooleanType BOOLEAN = BooleanType.BOOLEAN;

public static final LongType LONG = LongType.LONG;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 1,17 @@
package org.embulk.spi.util;

import java.nio.ByteBuffer;
import org.joda.time.DateTimeZone;
import org.embulk.config.ConfigSource;
import org.embulk.spi.type.Type;
import org.embulk.spi.type.BinaryType;
import org.embulk.spi.type.BooleanType;
import org.embulk.spi.type.LongType;
import org.embulk.spi.type.DoubleType;
import org.embulk.spi.type.StringType;
import org.embulk.spi.type.TimestampType;
import org.embulk.spi.type.JsonType;
import org.embulk.spi.util.dynamic.BinaryColumnSetter;
import org.embulk.spi.util.dynamic.BooleanColumnSetter;
import org.embulk.spi.util.dynamic.LongColumnSetter;
import org.embulk.spi.util.dynamic.DoubleColumnSetter;
Expand Down Expand Up @@ -45,7 48,11 @@ public static DefaultValueSetter nullDefaultValue()
public DynamicColumnSetter newColumnSetter(PageBuilder pageBuilder, Column column)
{
Type type = column.getType();
if (type instanceof BooleanType) {
if (type instanceof BinaryType) {
final TimestampFormatter formatter = new TimestampFormatter(
getTimestampFormat(column).getFormat(), getTimeZone(column));
return new BinaryColumnSetter(pageBuilder, column, defaultValue, formatter);
} else if (type instanceof BooleanType) {
return new BooleanColumnSetter(pageBuilder, column, defaultValue);
} else if (type instanceof LongType) {
return new LongColumnSetter(pageBuilder, column, defaultValue);
Expand Down
15 changes: 15 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/util/PagePrinter.java
Original file line number Diff line number Diff line change
@@ -1,5 1,6 @@
package org.embulk.spi.util;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import org.embulk.spi.time.Timestamp;
Expand Down Expand Up @@ -74,6 75,20 @@ public ToStringColumnVisitor(PageReader reader)
this.reader = reader;
}

@Override
public void binaryColumn(Column column)
{
final StringBuilder stringBuilder = new StringBuilder();
final ByteBuffer duplicatedBuffer = reader.getBinary(column).duplicate().rewind();
while (duplicatedBuffer.hasRemaining()) {
if (stringBuilder.length() > 0) {
stringBuilder.append(":");
}
stringBuilder.append(String.format("X", duplicatedBuffer.get()));
}
string = stringBuilder.toString();
}

public void booleanColumn(Column column)
{
string = Boolean.toString(reader.getBoolean(column));
Expand Down
10 changes: 10 additions & 0 deletions embulk-core/src/main/java/org/embulk/spi/util/Pages.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 58,16 @@ public ObjectColumnVisitor(PageReader record)

public abstract void visit(Column column, Object obj);

@Override
public void binaryColumn(Column column)
{
if (record.isNull(column)) {
visit(column, null);
} else {
visit(column, record.getBinary(column));
}
}

@Override
public void booleanColumn(Column column)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 14,7 @@
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.json.RubyValueApi;
import org.msgpack.value.Value;
import java.nio.ByteBuffer;

public abstract class AbstractDynamicColumnSetter
implements DynamicColumnSetter
Expand All @@ -40,6 41,11 @@ protected AbstractDynamicColumnSetter(PageBuilder pageBuilder, Column column,

public abstract void set(String value);

public void set(ByteBuffer value)
{
// Default implementation for older plugins
}

public abstract void set(Timestamp value);

public abstract void set(Value value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 1,73 @@
package org.embulk.spi.util.dynamic;

import com.google.common.collect.ImmutableSet;
import java.nio.ByteBuffer;
import org.embulk.spi.Column;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.time.TimestampFormatter;
import org.msgpack.value.Value;

import java.nio.charset.Charset;

public class BinaryColumnSetter
extends AbstractDynamicColumnSetter
{
public BinaryColumnSetter(PageBuilder pageBuilder, Column column,
DefaultValueSetter defaultValue,
TimestampFormatter timestampFormatter)
{
super(pageBuilder, column, defaultValue);
this.timestampFormatter = timestampFormatter;
}

@Override
public void setNull()
{
pageBuilder.setNull(column);
}

@Override
public void set(boolean v)
{
pageBuilder.setBinary(column, Boolean.toString(v).getBytes(Charset.defaultCharset()));
}

@Override
public void set(long v)
{
pageBuilder.setBinary(column, Long.toString(v).getBytes(Charset.defaultCharset()));
}

@Override
public void set(double v)
{
pageBuilder.setBinary(column, Double.toString(v).getBytes(Charset.defaultCharset()));
}

@Override
public void set(String v)
{
pageBuilder.setBinary(column, v.getBytes(Charset.defaultCharset()));
}

@Override
public void set(ByteBuffer v)
{
pageBuilder.setBinary(column, v.asReadOnlyBuffer());
}

@Override
public void set(Timestamp v)
{
pageBuilder.setBinary(column, timestampFormatter.format(v).getBytes(Charset.defaultCharset()));
}

@Override
public void set(Value v)
{
pageBuilder.setBinary(column, v.toJson().getBytes(Charset.defaultCharset()));
}

private final TimestampFormatter timestampFormatter;
}
Loading