Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.softwaremill.jox.CellState.*;
import static com.softwaremill.jox.Segment.findAndMoveForward;
import static com.softwaremill.jox.Select.select;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
Expand Down Expand Up @@ -252,6 +253,37 @@ public void send(T value) throws InterruptedException {
}
}

/**
* Kotlin provides
* https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html
* and tryReceive() returning ChannelResult
*
* <p>Project Reactor has tryEmitNext() for similar NIO integration
*/
public boolean trySend(T value) throws InterruptedException {
var sent = select(sendClause(value), DEFAULT_NOT_SENT_CLAUSE);
return sent != DEFAULT_NOT_SENT_VALUE;
}

private static final Object DEFAULT_NOT_SENT_VALUE = new Object();
private static final DefaultClause<?> DEFAULT_NOT_SENT_CLAUSE = new DefaultClauseValue<>(DEFAULT_NOT_SENT_VALUE);

@SafeVarargs
public static <T> boolean trySend(T value, Channel<T>... toOneOfChannels) throws InterruptedException {
if (toOneOfChannels == null || toOneOfChannels.length == 0)
return false;

var selectCauses = new SelectClause[toOneOfChannels.length + 1];
for (int i = 0; i < toOneOfChannels.length; i++){
selectCauses[i] = toOneOfChannels[i].sendClause(value);
}
selectCauses[toOneOfChannels.length] = DEFAULT_NOT_SENT_CLAUSE;

var sent = select(selectCauses);
return sent != DEFAULT_NOT_SENT_VALUE;
}


@Override
public Object sendOrClosed(T value) throws InterruptedException {
return doSend(value, null, null);
Expand Down
15 changes: 11 additions & 4 deletions channels/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,22 @@ private static ChannelError getAnyChannelInError(SelectClause<?>[] clauses) {
}

public static <T> SelectClause<T> defaultClause(T value) {
return defaultClause(() -> value);
return new DefaultClauseValue<>(value);
}

@SuppressWarnings("unchecked")
public static <T> SelectClause<T> defaultClauseNull() {
return (SelectClause<T>) DEFAULT_NULL;
}

private static final DefaultClause<Void> DEFAULT_NULL = new DefaultClauseValue<Void>(null);

public static <T> SelectClause<T> defaultClause(Supplier<T> callback) {
return new DefaultClause<>(callback);
return new DefaultClauseCallback<>(callback);
}
}

class SelectInstance {
final class SelectInstance {
/**
* Possible states:
*
Expand Down Expand Up @@ -587,7 +594,7 @@ enum SelectState {
* Used to keep information about a select instance that is stored in a channel, awaiting
* completion.
*/
class StoredSelectClause {
final class StoredSelectClause {
private final SelectInstance select;
private final Segment segment;
private final int i;
Expand Down
40 changes: 26 additions & 14 deletions channels/src/main/java/com/softwaremill/jox/SelectClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
* <p>A clause instance is immutable and can be reused in multiple `select` calls.
*/
public abstract class SelectClause<T> {
abstract Channel<?> getChannel();
Channel<?> getChannel() {
return null;
}

/**
* @return Either a {@link StoredSelectClause}, {@link ChannelClosed} when the channel is
Expand All @@ -25,25 +27,35 @@ public abstract class SelectClause<T> {
abstract T transformedRawValue(Object rawValue);
}

final class DefaultClause<T> extends SelectClause<T> {
private final Supplier<T> callback;
abstract class DefaultClause<T> extends SelectClause<T> {
/**
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null},
* to indicate that the default clause has been selected during registration.
*/
@Override
Object register(SelectInstance select) {
return this;
}
}

public DefaultClause(Supplier<T> callback) {
this.callback = callback;
final class DefaultClauseValue<T> extends DefaultClause<T> {
private final T value;

public DefaultClauseValue(T value) {
this.value = value;
}

@Override
Channel<?> getChannel() {
return null;
T transformedRawValue(Object rawValue) {
return value;
}
}

@Override
Object register(SelectInstance select) {
/*
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to
* indicate that the default clause has been selected during registration.
*/
return this;
final class DefaultClauseCallback<T> extends DefaultClause<T> {
private final Supplier<T> callback;

public DefaultClauseCallback(Supplier<T> callback) {
this.callback = callback;
}

@Override
Expand Down
69 changes: 68 additions & 1 deletion channels/src/test/java/com/softwaremill/jox/SelectSendTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static com.softwaremill.jox.Select.*;
import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -241,11 +241,78 @@ public void testTrySendWithDefault() throws InterruptedException {

// then
assertEquals("not sent", sent);

// when
sent = select(ch1.sendClause("v2", () -> "sent"), defaultClause(() -> "not sent"));

// then
assertEquals("not sent", sent);

assertEquals("v1", ch1.receive());

// when - now there's space in the channel
var sent2 = select(ch1.sendClause("v2", () -> "sent"), defaultClause("not sent"));
assertEquals("sent", sent2);
assertEquals("v2", ch1.receive());
}

@Test
public void testTrySendWithDefaultNull() throws InterruptedException {
// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
ch1.send("v1"); // the channel is now full

// when
var sent = select(ch1.sendClause("v2", () -> "sent"), defaultClauseNull());

// then
assertNull(sent);
assertEquals("v1", ch1.receive());

// when - now there's space in the channel
var sent2 = select(ch1.sendClause("v2", () -> "sent"), defaultClauseNull());
assertEquals("sent", sent2);
assertEquals("v2", ch1.receive());
}

@Test
public void testTrySend() throws InterruptedException {
// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
assertTrue(ch1.trySend("v1")); // the channel is now full

// when
assertFalse(ch1.trySend("v2"));

assertEquals("v1", ch1.receive());

// when - now there's space in the channel
assertTrue(ch1.trySend("v2"));
assertEquals("v2", ch1.receive());
}

@Test
public void testTrySend2() throws InterruptedException {
assertFalse(Channel.trySend("v2", null));

// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
Channel<String> ch2 = Channel.newBufferedChannel(1);
assertTrue(ch1.trySend("v1a")); // the channel is now full
assertTrue(ch2.trySend("v1b")); // the channel is now full

// when
assertFalse(ch1.trySend("v2"));
assertFalse(ch2.trySend("v2"));
assertFalse(Channel.trySend("v2", ch1, ch2));

assertEquals("v1a", ch1.receive());
assertEquals("v1b", ch2.receive());

// when - now there's space in the channel
assertTrue(Channel.trySend("v2a", ch1, ch2));
assertTrue(Channel.trySend("v2b", ch1, ch2));
assertEquals("v2a", ch1.receive());
assertEquals("v2b", ch2.receive());
}
}
33 changes: 30 additions & 3 deletions channels/src/test/java/com/softwaremill/jox/SelectTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.softwaremill.jox;

import static com.softwaremill.jox.Select.defaultClause;
import static com.softwaremill.jox.Select.defaultClauseNull;
import static com.softwaremill.jox.Select.select;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;

public class SelectTest {
@Test
public void testSelectDefault() throws InterruptedException {
public void testSelectDefaultValue() throws InterruptedException {
// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
Channel<String> ch2 = Channel.newBufferedChannel(1);
Expand All @@ -21,6 +21,33 @@ public void testSelectDefault() throws InterruptedException {
assertEquals("x", received);
}

@Test
public void testSelectDefaultCallback() throws InterruptedException {
// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
Channel<String> ch2 = Channel.newBufferedChannel(1);

// when
String received =
select(ch1.receiveClause(), ch2.receiveClause(), defaultClause(() -> "x"));

// then
assertEquals("x", received);
}

@Test
public void testSelectDefaultNull() throws InterruptedException {
// given
Channel<String> ch1 = Channel.newBufferedChannel(1);
Channel<String> ch2 = Channel.newBufferedChannel(1);

// when
String received = select(ch1.receiveClause(), ch2.receiveClause(), defaultClauseNull());

// then
assertNull(received);
}

@Test
public void testDoNotSelectDefault() throws InterruptedException {
// given
Expand Down
Loading