1
0
mirror of https://github.com/mik3y/usb-serial-for-android synced 2025-12-12 08:48:04 +00:00

setReadQueue(...)

for applications doing permanent read() with timeout=0, multiple buffers can be used to copy next data from Linux kernel, while the current data is processed.
This commit is contained in:
Kai Morich 2025-02-16 09:34:39 +01:00
parent c608aadc59
commit a8b9ecc7d3
9 changed files with 318 additions and 20 deletions

View File

@ -51,7 +51,7 @@ project.afterEvaluate {
// values used for local maven repo, jitpack uses github release: // values used for local maven repo, jitpack uses github release:
groupId 'com.github.mik3y' groupId 'com.github.mik3y'
artifactId 'usb-serial-for-android' artifactId 'usb-serial-for-android'
version '3.8.0beta' version '3.9.1beta'
} }
} }
} }

View File

@ -14,6 +14,7 @@ import android.content.Context;
import android.hardware.usb.UsbDevice; import android.hardware.usb.UsbDevice;
import android.hardware.usb.UsbDeviceConnection; import android.hardware.usb.UsbDeviceConnection;
import android.hardware.usb.UsbManager; import android.hardware.usb.UsbManager;
import android.hardware.usb.UsbRequest;
import android.os.Process; import android.os.Process;
import androidx.test.core.app.ApplicationProvider; import androidx.test.core.app.ApplicationProvider;
import androidx.test.platform.app.InstrumentationRegistry; import androidx.test.platform.app.InstrumentationRegistry;
@ -43,7 +44,6 @@ import com.hoho.android.usbserial.driver.UsbSerialPort.ControlLine;
import com.hoho.android.usbserial.driver.UsbSerialPort.FlowControl; import com.hoho.android.usbserial.driver.UsbSerialPort.FlowControl;
import com.hoho.android.usbserial.util.XonXoffFilter; import com.hoho.android.usbserial.util.XonXoffFilter;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
@ -60,6 +60,7 @@ import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -1200,6 +1201,66 @@ public class DeviceTest {
assertTrue(data.length() != expected.length()); assertTrue(data.length() != expected.length());
} }
@Test
public void readQueue() throws Exception {
class CountingUsbRequest extends UsbRequest {
int count;
@Override public Object getClientData() { count += 1; return super.getClientData(); }
}
CommonUsbSerialPortWrapper.setReadQueueRequestSupplier(usb.serialPort, CountingUsbRequest::new);
usb.serialPort.setReadQueue(2, 0);
assertEquals(0, usb.serialPort.getReadQueueBufferSize());
usb.open(EnumSet.of(UsbWrapper.OpenCloseFlags.NO_IOMANAGER_START));
int len = usb.serialPort.getReadEndpoint().getMaxPacketSize();
assertEquals(len, usb.serialPort.getReadQueueBufferSize());
assertEquals(2, usb.serialPort.getReadQueueBufferCount());
assertEquals(0, usb.ioManager.getReadQueueBufferCount()); // not set at port yet
assertThrows(IllegalStateException.class, () -> usb.ioManager.setReadQueue(1)); // cannot reduce bufferCount
usb.ioManager.setReadQueue(2);
usb.ioManager.start();
usb.serialPort.setReadQueue(3, 0);
usb.serialPort.setReadQueue(3, len);
usb.ioManager.setReadQueue(4);
usb.setParameters(115200, 8, 1, UsbSerialPort.PARITY_NONE);
telnet.setParameters(115200, 8, 1, UsbSerialPort.PARITY_NONE);
// linux kernel does round-robin
LinkedList<UsbRequest> requests = CommonUsbSerialPortWrapper.getReadQueueRequests(usb.serialPort);
assertNotNull(requests);
for (int i=0; i<4*4; i++) {
telnet.write(new byte[1]);
usb.read(1);
}
for (UsbRequest request : requests) {
int count = ((CountingUsbRequest)request).count;
if(usb.serialDriver instanceof FtdiSerialDriver) {
assertTrue(String.valueOf(count), count >= 4);
} else {
assertEquals(String.valueOf(count), 4, count);
}
}
usb.ioManager.setReadQueue(6);
for (int i=0; i<3*6; i++) {
telnet.write(new byte[1]);
usb.read(1);
}
for (UsbRequest request : requests) {
int count = ((CountingUsbRequest)request).count;
if(usb.serialDriver instanceof FtdiSerialDriver) {
assertTrue(String.valueOf(count), count >= 3);
} else {
assertTrue(String.valueOf(count), count == 7 || count == 3);
}
}
usb.close();
usb.open(EnumSet.of(UsbWrapper.OpenCloseFlags.NO_IOMANAGER_START));
usb.serialPort.setReadQueue(8, len);
assertThrows(IllegalStateException.class, () -> usb.serialPort.read(new byte[len], 1) ); // cannot use timeout != 0
assertThrows(IllegalStateException.class, () -> usb.serialPort.read(new byte[4], 0) ); // cannot use different length
assertThrows(IllegalStateException.class, () -> usb.ioManager.start()); // cannot reduce bufferCount
}
@Test @Test
public void readSpeed() throws Exception { public void readSpeed() throws Exception {
// see logcat for performance results // see logcat for performance results

View File

@ -1,8 +1,24 @@
package com.hoho.android.usbserial.driver; package com.hoho.android.usbserial.driver;
import android.hardware.usb.UsbRequest;
import com.hoho.android.usbserial.util.UsbUtils;
import java.util.LinkedList;
public class CommonUsbSerialPortWrapper { public class CommonUsbSerialPortWrapper {
public static byte[] getWriteBuffer(UsbSerialPort serialPort) { public static byte[] getWriteBuffer(UsbSerialPort serialPort) {
CommonUsbSerialPort commonSerialPort = (CommonUsbSerialPort) serialPort; CommonUsbSerialPort commonSerialPort = (CommonUsbSerialPort) serialPort;
return commonSerialPort.mWriteBuffer; return commonSerialPort.mWriteBuffer;
} }
public static LinkedList<UsbRequest> getReadQueueRequests(UsbSerialPort serialPort) {
CommonUsbSerialPort commonSerialPort = (CommonUsbSerialPort) serialPort;
return commonSerialPort.mReadQueueRequests;
}
public static void setReadQueueRequestSupplier(UsbSerialPort serialPort, UsbUtils.Supplier<UsbRequest> supplier) {
CommonUsbSerialPort commonSerialPort = (CommonUsbSerialPort) serialPort;
commonSerialPort.mUsbRequestSupplier = supplier;
}
} }

View File

@ -198,7 +198,6 @@ public class UsbWrapper implements SerialInputOutputManager.Listener {
serialPort.close(); serialPort.close();
} catch (Exception ignored) { } catch (Exception ignored) {
} }
//usbSerialPort = null;
} }
if(!flags.contains(OpenCloseFlags.NO_DEVICE_CONNECTION)) { if(!flags.contains(OpenCloseFlags.NO_DEVICE_CONNECTION)) {
deviceConnection = null; // closed in usbSerialPort.close() deviceConnection = null; // closed in usbSerialPort.close()

View File

@ -14,10 +14,13 @@ import android.os.Build;
import android.util.Log; import android.util.Log;
import com.hoho.android.usbserial.util.MonotonicClock; import com.hoho.android.usbserial.util.MonotonicClock;
import com.hoho.android.usbserial.util.UsbUtils;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Objects;
/** /**
* A base class shared by several driver implementations. * A base class shared by several driver implementations.
@ -38,8 +41,12 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
protected UsbDeviceConnection mConnection; protected UsbDeviceConnection mConnection;
protected UsbEndpoint mReadEndpoint; protected UsbEndpoint mReadEndpoint;
protected UsbEndpoint mWriteEndpoint; protected UsbEndpoint mWriteEndpoint;
protected UsbRequest mUsbRequest; protected UsbRequest mReadRequest;
protected LinkedList<UsbRequest> mReadQueueRequests;
private int mReadQueueBufferCount;
private int mReadQueueBufferSize;
protected FlowControl mFlowControl = FlowControl.NONE; protected FlowControl mFlowControl = FlowControl.NONE;
protected UsbUtils.Supplier<UsbRequest> mUsbRequestSupplier = UsbRequest::new; // override for testing
/** /**
* Internal write buffer. * Internal write buffer.
@ -110,6 +117,52 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
} }
} }
@Override
public void setReadQueue(int bufferCount, int bufferSize) {
if (bufferCount < 0) {
throw new IllegalArgumentException("Invalid bufferCount");
}
if (bufferSize < 0) {
throw new IllegalArgumentException("Invalid bufferSize");
}
if(isOpen()) {
if (bufferCount < mReadQueueBufferCount) {
throw new IllegalStateException("Cannot reduce bufferCount when port is open");
}
if (bufferSize == 0) {
bufferSize = mReadEndpoint.getMaxPacketSize();
}
if (mReadQueueBufferSize == 0) {
mReadQueueBufferSize = mReadEndpoint.getMaxPacketSize();
}
if (mReadQueueBufferCount != 0 && bufferSize != mReadQueueBufferSize) {
throw new IllegalStateException("Cannot change bufferSize when port is open");
}
if (bufferCount > 0) {
if (mReadQueueRequests == null) {
mReadQueueRequests = new LinkedList<>();
}
for (int i = mReadQueueRequests.size(); i < bufferCount; i++) {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
UsbRequest request = mUsbRequestSupplier.get();
request.initialize(mConnection, mReadEndpoint);
request.setClientData(buffer);
request.queue(buffer, bufferSize);
mReadQueueRequests.add(request);
}
}
}
mReadQueueBufferCount = bufferCount;
mReadQueueBufferSize = bufferSize;
}
@Override
public int getReadQueueBufferCount() { return mReadQueueBufferCount; }
@Override
public int getReadQueueBufferSize() { return mReadQueueBufferSize; }
private boolean useReadQueue() { return mReadQueueBufferCount != 0; }
@Override @Override
public void open(UsbDeviceConnection connection) throws IOException { public void open(UsbDeviceConnection connection) throws IOException {
if (mConnection != null) { if (mConnection != null) {
@ -125,8 +178,9 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
if (mReadEndpoint == null || mWriteEndpoint == null) { if (mReadEndpoint == null || mWriteEndpoint == null) {
throw new IOException("Could not get read & write endpoints"); throw new IOException("Could not get read & write endpoints");
} }
mUsbRequest = new UsbRequest(); mReadRequest = mUsbRequestSupplier.get();
mUsbRequest.initialize(mConnection, mReadEndpoint); mReadRequest.initialize(mConnection, mReadEndpoint);
setReadQueue(mReadQueueBufferCount, mReadQueueBufferSize); // fill mReadQueueRequests
ok = true; ok = true;
} finally { } finally {
if (!ok) { if (!ok) {
@ -144,11 +198,19 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
if (mConnection == null) { if (mConnection == null) {
throw new IOException("Already closed"); throw new IOException("Already closed");
} }
UsbRequest usbRequest = mUsbRequest; UsbRequest readRequest = mReadRequest;
mUsbRequest = null; mReadRequest = null;
try { try {
usbRequest.cancel(); readRequest.cancel();
} catch(Exception ignored) {} } catch(Exception ignored) {}
if(mReadQueueRequests != null) {
for(UsbRequest readQueueRequest : mReadQueueRequests) {
try {
readQueueRequest.cancel();
} catch(Exception ignored) {}
}
mReadQueueRequests = null;
}
try { try {
closeInt(); closeInt();
} catch(Exception ignored) {} } catch(Exception ignored) {}
@ -168,7 +230,7 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
} }
protected void testConnection(boolean full, String msg) throws IOException { protected void testConnection(boolean full, String msg) throws IOException {
if(mUsbRequest == null) { if(mReadRequest == null) {
throw new IOException("Connection closed"); throw new IOException("Connection closed");
} }
if(!full) { if(!full) {
@ -199,6 +261,9 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
length = Math.min(length, dest.length); length = Math.min(length, dest.length);
final int nread; final int nread;
if (timeout != 0) { if (timeout != 0) {
if(useReadQueue()) {
throw new IllegalStateException("Cannot use timeout!=0 if readQueue is enabled");
}
// bulkTransfer will cause data loss with short timeout + high baud rates + continuous transfer // bulkTransfer will cause data loss with short timeout + high baud rates + continuous transfer
// https://stackoverflow.com/questions/9108548/android-usb-host-bulktransfer-is-losing-data // https://stackoverflow.com/questions/9108548/android-usb-host-bulktransfer-is-losing-data
// but mConnection.requestWait(timeout) available since Android 8.0 es even worse, // but mConnection.requestWait(timeout) available since Android 8.0 es even worse,
@ -216,15 +281,31 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
testConnection(MonotonicClock.millis() < endTime); testConnection(MonotonicClock.millis() < endTime);
} else { } else {
final ByteBuffer buf = ByteBuffer.wrap(dest, 0, length); ByteBuffer buf = null;
if (!mUsbRequest.queue(buf, length)) { if(useReadQueue()) {
throw new IOException("Queueing USB request failed"); if (length != mReadQueueBufferSize) {
throw new IllegalStateException("Cannot use different length if readQueue is enabled");
}
} else {
buf = ByteBuffer.wrap(dest, 0, length);
if (!mReadRequest.queue(buf, length)) {
throw new IOException("Queueing USB request failed");
}
} }
final UsbRequest response = mConnection.requestWait(); final UsbRequest response = mConnection.requestWait();
if (response == null) { if (response == null) {
throw new IOException("Waiting for USB request failed"); throw new IOException("Waiting for USB request failed");
} }
nread = buf.position(); if(useReadQueue()) {
buf = (ByteBuffer) response.getClientData();
System.arraycopy(buf.array(), 0, dest, 0, buf.position());
if(mReadRequest != null) { // re-queue if connection not closed
if (!response.queue(buf, buf.capacity())) {
throw new IOException("Queueing USB request failed");
}
}
}
nread = Objects.requireNonNull(buf).position();
// Android error propagation is improvable: // Android error propagation is improvable:
// response != null & nread == 0 can be: connection lost, buffer to small, ??? // response != null & nread == 0 can be: connection lost, buffer to small, ???
if(nread == 0) { if(nread == 0) {
@ -297,7 +378,7 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort {
@Override @Override
public boolean isOpen() { public boolean isOpen() {
return mUsbRequest != null; return mReadRequest != null;
} }
@Override @Override

View File

@ -104,6 +104,23 @@ public interface UsbSerialPort extends Closeable {
*/ */
String getSerial(); String getSerial();
/**
* Applications doing permanent {@link #read} with timeout=0 can reduce data loss likelihood
* at high baud rate and continuous data transfer by using multiple buffers to copy next data
* from Linux kernel, while the current data is processed.
* When enabled, {@link #read} can not be called with timeout!=0 or different buffer size.
*
* @param bufferCount number of buffers to use for readQueue.
* Use 0 to disable.
* @param bufferSize size of each buffer.
* Use 0 for optimal size (= getReadEndpoint().getMaxPacketSize()).
* @throws IllegalStateException if port is open and buffer count should be lowered or
* buffer size should be changed.
*/
void setReadQueue(int bufferCount, int bufferSize);
int getReadQueueBufferCount();
int getReadQueueBufferSize();
/** /**
* Opens and initializes the port. Upon success, caller must ensure that * Opens and initializes the port. Upon success, caller must ensure that
* {@link #close()} is eventually called. * {@link #close()} is eventually called.

View File

@ -33,16 +33,18 @@ public class SerialInputOutputManager {
public static boolean DEBUG = false; public static boolean DEBUG = false;
private static final String TAG = SerialInputOutputManager.class.getSimpleName(); private static final String TAG = SerialInputOutputManager.class.getSimpleName();
private static final int BUFSIZ = 4096; private static final int WRITE_BUFFER_SIZE = 4096;
private int mReadTimeout = 0; private int mReadTimeout = 0;
private int mWriteTimeout = 0; private int mWriteTimeout = 0;
private int mReadQueueBufferCount = 0;
// no mReadQueueBufferSize, using mReadBuffer.size instead
private final Object mReadBufferLock = new Object(); private final Object mReadBufferLock = new Object();
private final Object mWriteBufferLock = new Object(); private final Object mWriteBufferLock = new Object();
private ByteBuffer mReadBuffer; // default size = getReadEndpoint().getMaxPacketSize() private ByteBuffer mReadBuffer; // default size = getReadEndpoint().getMaxPacketSize()
private ByteBuffer mWriteBuffer = ByteBuffer.allocate(BUFSIZ); private ByteBuffer mWriteBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
private int mThreadPriority = Process.THREAD_PRIORITY_URGENT_AUDIO; private int mThreadPriority = Process.THREAD_PRIORITY_URGENT_AUDIO;
private final AtomicReference<State> mState = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> mState = new AtomicReference<>(State.STOPPED);
@ -68,9 +70,8 @@ public class SerialInputOutputManager {
} }
public SerialInputOutputManager(UsbSerialPort serialPort, Listener listener) { public SerialInputOutputManager(UsbSerialPort serialPort, Listener listener) {
mSerialPort = serialPort; this(serialPort);
mListener = listener; mListener = listener;
mReadBuffer = ByteBuffer.allocate(serialPort.getReadEndpoint().getMaxPacketSize());
} }
public synchronized void setListener(Listener listener) { public synchronized void setListener(Listener listener) {
@ -145,6 +146,20 @@ public class SerialInputOutputManager {
return mWriteBuffer.capacity(); return mWriteBuffer.capacity();
} }
/**
* Set read queue, similar to {@link UsbSerialPort#setReadQueue}
* except buffer size to be set before with {@link #setReadBufferSize}.
*
* @param bufferCount number of buffers to use for readQueue,
* disable with value 0
*/
public void setReadQueue(int bufferCount) {
mSerialPort.setReadQueue(bufferCount, getReadBufferSize());
mReadQueueBufferCount = bufferCount; // only store if set ok
}
public int getReadQueueBufferCount() { return mReadQueueBufferCount; }
/** /**
* write data asynchronously * write data asynchronously
*/ */
@ -159,6 +174,7 @@ public class SerialInputOutputManager {
* start SerialInputOutputManager in separate threads * start SerialInputOutputManager in separate threads
*/ */
public void start() { public void start() {
mSerialPort.setReadQueue(mReadQueueBufferCount, getReadBufferSize());
if(mState.compareAndSet(State.STOPPED, State.STARTING)) { if(mState.compareAndSet(State.STOPPED, State.STARTING)) {
mStartuplatch = new CountDownLatch(2); mStartuplatch = new CountDownLatch(2);
new Thread(this::runRead, this.getClass().getSimpleName() + "_read").start(); new Thread(this::runRead, this.getClass().getSimpleName() + "_read").start();

View File

@ -6,6 +6,11 @@ import java.util.ArrayList;
public class UsbUtils { public class UsbUtils {
// copied from java.util.function.Supplier which is not available < API 24
public interface Supplier<T> {
T get();
}
private UsbUtils() { private UsbUtils() {
} }
@ -29,5 +34,4 @@ public class UsbUtils {
return descriptors; return descriptors;
} }
} }

View File

@ -0,0 +1,104 @@
package com.hoho.android.usbserial.driver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import android.hardware.usb.UsbDevice;
import android.hardware.usb.UsbDeviceConnection;
import android.hardware.usb.UsbEndpoint;
import android.hardware.usb.UsbRequest;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class CommonUsbSerialPortTest {
static class DummySerialDriver implements UsbSerialDriver {
ArrayList<UsbSerialPort> ports = new ArrayList<>();
DummySerialDriver() { ports.add(new DummySerialPort(null, 0)); }
@Override
public UsbDevice getDevice() { return null; }
@Override
public List<UsbSerialPort> getPorts() { return ports; }
}
static class DummySerialPort extends CommonUsbSerialPort {
public DummySerialPort(UsbDevice device, int portNumber) {
super(device, portNumber);
mUsbRequestSupplier = DummyUsbRequest::new;
}
@Override
protected void openInt() throws IOException { mReadEndpoint = mWriteEndpoint = mock(UsbEndpoint.class); }
@Override
protected void closeInt() { }
@Override
public UsbSerialDriver getDriver() { return null; }
@Override
public void setParameters(int baudRate, int dataBits, int stopBits, int parity) throws IOException { }
}
static class DummyUsbRequest extends UsbRequest {
@Override
public boolean initialize(UsbDeviceConnection connection, UsbEndpoint endpoint) { return true; }
@Override
public void setClientData(Object data) { }
@Override
public boolean queue(ByteBuffer buffer, int length) { return true; }
}
@Test
public void readQueue() throws Exception {
UsbDeviceConnection usbDeviceConnection = mock(UsbDeviceConnection.class);
DummySerialDriver driver = new DummySerialDriver();
CommonUsbSerialPort port = (CommonUsbSerialPort) driver.getPorts().get(0);
// set before open
port.setReadQueue(0, 0);
assertThrows(IllegalArgumentException.class, () -> port.setReadQueue(-1, 1));
assertThrows(IllegalArgumentException.class, () -> port.setReadQueue(1, -1));
port.setReadQueue(2, 256);
assertNull(port.mReadQueueRequests);
// change after open
port.open(usbDeviceConnection);
assertNotNull(port.mReadQueueRequests);
assertEquals(2, port.mReadQueueRequests.size());
assertThrows(IllegalStateException.class, () -> port.setReadQueue(1, 256));
port.setReadQueue(3, 256);
assertEquals(3, port.mReadQueueRequests.size());
assertThrows(IllegalStateException.class, () -> port.setReadQueue(3, 128));
assertThrows(IllegalStateException.class, () -> port.setReadQueue(3, 512));
// set after open
port.close();
port.setReadQueue(0, 0);
port.open(usbDeviceConnection);
assertNull(port.mReadQueueRequests);
port.setReadQueue(3, 256);
// retain over close
port.close();
assertNull(port.mReadQueueRequests);
port.open(usbDeviceConnection);
assertEquals(3, port.mReadQueueRequests.size());
}
}