Skip to content
108 changes: 108 additions & 0 deletions lib/src/main/java/io/ably/lib/object/LiveObjectsPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.ably.lib.object;

import io.ably.lib.object.adapter.AblyClientAdapter;
import io.ably.lib.object.adapter.Adapter;
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.reflect.InvocationTargetException;

/**
* The LiveObjectsPlugin interface provides a mechanism for managing and interacting with
* live data objects in a real-time environment. It allows for the retrieval, disposal, and
* management of Objects instances associated with specific channel names.
*/
public interface LiveObjectsPlugin {

/**
* Retrieves an instance of RealtimeObjects associated with the specified channel name.
* This method ensures that a RealtimeObjects instance is available for the given channel,
* creating one if it does not already exist.
*
* @param channelName the name of the channel for which the RealtimeObjects instance is to be retrieved.
* @return the RealtimeObjects instance associated with the specified channel name.
*/
@NotNull
RealtimeObject getInstance(@NotNull String channelName);

/**
* Handles a protocol message.
* This method is invoked whenever a protocol message is received, allowing the implementation
* to process the message and take appropriate actions.
*
* @param message the protocol message to handle.
*/
void handle(@NotNull ProtocolMessage message);

/**
* Handles state changes for a specific channel.
* This method is invoked whenever a channel's state changes, allowing the implementation
* to update the RealtimeObjects instances accordingly based on the new state and presence of objects.
*
* @param channelName the name of the channel whose state has changed.
* @param state the new state of the channel.
* @param hasObjects flag indicates whether the channel has any associated objects.
*/
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);

/**
* Disposes of the RealtimeObjects instance associated with the specified channel name.
* This method removes the RealtimeObjects instance for the given channel, releasing any
* resources associated with it.
* This is invoked when ablyRealtimeClient.channels.release(channelName) is called
*
* @param channelName the name of the channel whose RealtimeObjects instance is to be removed.
*/
void dispose(@NotNull String channelName);

/**
* Disposes of the plugin instance and all underlying resources.
* This is invoked when ablyRealtimeClient.close() is called
*/
void dispose();

/**
* Attempts to initialize the LiveObjects plugin by reflectively loading its implementation
* from the classpath. Returns a new plugin instance on every successful invocation, or
* {@code null} if the LiveObjects plugin is not present in the classpath.
*
* @param ablyRealtime the AblyRealtime client used to build the adapter the plugin runs against.
* @return a new {@link LiveObjectsPlugin} instance, or {@code null} if the plugin is unavailable.
*/
@Nullable
static LiveObjectsPlugin tryInitialize(@NotNull AblyRealtime ablyRealtime) {
return Factory.create(ablyRealtime);
}

/**
* Reflectively constructs the LiveObjects plugin implementation. Lives in a nested class so the
* implementation-class name stays {@code private} (interface fields are forced {@code public}),
* mirroring {@link io.ably.lib.object.serialization.ObjectSerializer.Holder}. Unlike {@code Holder}
* this is stateless: {@link #create} returns a new instance on every call.
*/
final class Factory {
private static final String TAG = LiveObjectsPlugin.Factory.class.getName();
private static final String IMPLEMENTATION_CLASS = "io.ably.lib.object.DefaultLiveObjectsPlugin";

private Factory() {}

@Nullable
static LiveObjectsPlugin create(@NotNull AblyRealtime ablyRealtime) {
try {
Class<?> objectsImplementation = Class.forName(IMPLEMENTATION_CLASS);
AblyClientAdapter adapter = new Adapter(ablyRealtime);
return (LiveObjectsPlugin) objectsImplementation
.getDeclaredConstructor(AblyClientAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
return null;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.ably.lib.object.serialization;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import io.ably.lib.util.Log;

import java.lang.reflect.Type;

public class ObjectJsonSerializer implements JsonSerializer<Object[]>, JsonDeserializer<Object[]> {
private static final String TAG = ObjectJsonSerializer.class.getName();

@Override
public Object[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
ObjectSerializer serializer = ObjectSerializer.tryGet();
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json deserialization because ObjectSerializer not found.");
return null;
}
Comment thread
Copilot marked this conversation as resolved.
if (!json.isJsonArray()) {
throw new JsonParseException("Expected a JSON array for 'state' field, but got: " + json);
}
return serializer.readFromJsonArray(json.getAsJsonArray());
}

@Override
public JsonElement serialize(Object[] src, Type typeOfSrc, JsonSerializationContext context) {
ObjectSerializer serializer = ObjectSerializer.tryGet();
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json serialization because ObjectSerializer not found.");
return JsonNull.INSTANCE;
}
Comment thread
Copilot marked this conversation as resolved.
return serializer.asJsonArray(src);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.ably.lib.object.serialization;

import com.google.gson.JsonArray;
import io.ably.lib.util.Log;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

/**
* Serializer interface for converting between objects and their MessagePack or JSON representations.
*/
public interface ObjectSerializer {

/**
* Reads a MessagePack array from the given unpacker and deserializes it into an Object array.
*
* @param unpacker the MessageUnpacker to read from
* @return the deserialized Object array
* @throws IOException if an I/O error occurs during unpacking
*/
@NotNull
Object[] readMsgpackArray(@NotNull MessageUnpacker unpacker) throws IOException;

/**
* Serializes the given Object array as a MessagePack array using the provided packer.
*
* @param objects the Object array to serialize
* @param packer the MessagePacker to write to
* @throws IOException if an I/O error occurs during packing
*/
void writeMsgpackArray(@NotNull Object[] objects, @NotNull MessagePacker packer) throws IOException;

/**
* Reads a JSON array from the given {@link JsonArray} and deserializes it into an Object array.
*
* @param json the {@link JsonArray} representing the array to deserialize
* @return the deserialized Object array
*/
@NotNull
Object[] readFromJsonArray(@NotNull JsonArray json);

/**
* Serializes the given Object array as a JSON array.
*
* @param objects the Object array to serialize
* @return the resulting JsonArray
*/
@NotNull
JsonArray asJsonArray(@NotNull Object[] objects);

/**
* Returns the lazily-initialized, process-wide {@link ObjectSerializer} singleton, reflectively
* loaded from the LiveObjects plugin on the classpath. Returns {@code null} if the plugin is not
* present; the lookup is retried on subsequent calls until it succeeds.
*
* @return the shared {@link ObjectSerializer} instance, or {@code null} if the plugin is unavailable.
*/
@Nullable
static ObjectSerializer tryGet() {
return Holder.getSerializer();
}

/**
* Holds the lazily-initialized {@link ObjectSerializer} singleton. Interfaces cannot declare
* mutable static fields, so the cache lives here while {@link #tryGet()} delegates to it.
*/
final class Holder {
private static final String TAG = ObjectSerializer.Holder.class.getName();
private static final String IMPLEMENTATION_CLASS = "io.ably.lib.object.serialization.DefaultObjectsSerializer";
private static volatile ObjectSerializer objectsSerializer;

private Holder() {}

@Nullable
static ObjectSerializer getSerializer() {
if (objectsSerializer == null) {
synchronized (Holder.class) {
if (objectsSerializer == null) { // Double-Checked Locking (DCL)
try {
Class<?> serializerClass = Class.forName(IMPLEMENTATION_CLASS);
objectsSerializer = (ObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
NoSuchMethodException |
InvocationTargetException e) {
Log.w(TAG, "Failed to init ObjectSerializer, LiveObjects plugin not included in the classpath", e);
return null;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Comment thread
Copilot marked this conversation as resolved.
}
}
}
return objectsSerializer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package io.ably.lib.`object`.message
import com.google.gson.Gson
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import com.google.gson.annotations.JsonAdapter
import com.google.gson.annotations.SerializedName
import io.ably.lib.`object`.serialization.WireObjectDataJsonSerializer
import java.nio.charset.StandardCharsets
import java.util.Base64

Expand Down Expand Up @@ -36,6 +39,7 @@ internal enum class WireObjectsMapSemantics(val code: Int) {
}

/** Spec: OD1, OD2 - binary carried as base64 string on the wire */
@JsonAdapter(WireObjectDataJsonSerializer::class)
internal data class WireObjectData(
val objectId: String? = null, // OD2a
val string: String? = null, // OD2f
Expand Down Expand Up @@ -145,6 +149,7 @@ internal data class WireObjectMessage(
val connectionId: String? = null, // OM2c
val extras: JsonObject? = null, // OM2d
val operation: WireObjectOperation? = null, // OM2f
@SerializedName("object")
val objectState: WireObjectState? = null, // OM2g - wire key "object"
val serial: String? = null, // OM2h
val serialTimestamp: Long? = null, // OM2j
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.ably.lib.`object`.serialization

import com.google.gson.*
import io.ably.lib.`object`.message.WireObjectMessage
import org.msgpack.core.MessagePacker
import org.msgpack.core.MessageUnpacker

/**
* Default implementation of {@link ObjectSerializer} that handles serialization/deserialization
* of WireObjectMessage arrays for both JSON and MessagePack formats using Gson and MessagePack.
* Dynamically loaded by ObjectSerializer#tryGet() to avoid hard dependencies.
*/
@Suppress("unused") // Used via reflection in ObjectSerializer.Holder
internal class DefaultObjectsSerializer : ObjectSerializer {

override fun readMsgpackArray(unpacker: MessageUnpacker): Array<Any> {
val objectMessagesCount = unpacker.unpackArrayHeader()
return Array(objectMessagesCount) { readObjectMessage(unpacker) }
Comment thread
sacOO7 marked this conversation as resolved.
}

override fun writeMsgpackArray(objects: Array<out Any>, packer: MessagePacker) {
val objectMessages = objects.map { it as WireObjectMessage }
packer.packArrayHeader(objectMessages.size)
objectMessages.forEach { it.writeMsgpack(packer) }
}

override fun readFromJsonArray(json: JsonArray): Array<Any> {
return json.map { element ->
if (element.isJsonObject) element.asJsonObject.toObjectMessage()
else throw JsonParseException("Expected JsonObject, but found: $element")
}.toTypedArray()
}

override fun asJsonArray(objects: Array<out Any>): JsonArray {
val objectMessages = objects.map { it as WireObjectMessage }
val jsonArray = JsonArray()
for (objectMessage in objectMessages) {
jsonArray.add(objectMessage.toJsonObject())
}
return jsonArray
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.ably.lib.`object`.serialization

import com.google.gson.*
import io.ably.lib.`object`.message.WireObjectData
import io.ably.lib.`object`.message.WireObjectMessage
import io.ably.lib.`object`.message.WireObjectOperationAction
import io.ably.lib.`object`.message.WireObjectsMapSemantics
import java.lang.reflect.Type
import kotlin.enums.EnumEntries

// Gson instance for JSON serialization/deserialization
internal val gson = GsonBuilder()
.registerTypeAdapter(WireObjectOperationAction::class.java, EnumCodeTypeAdapter({ it.code }, WireObjectOperationAction.entries))
.registerTypeAdapter(WireObjectsMapSemantics::class.java, EnumCodeTypeAdapter({ it.code }, WireObjectsMapSemantics.entries))
.create()

internal fun WireObjectMessage.toJsonObject(): JsonObject {
return gson.toJsonTree(this).asJsonObject
}

internal fun JsonObject.toObjectMessage(): WireObjectMessage {
return gson.fromJson(this, WireObjectMessage::class.java)
}

internal class EnumCodeTypeAdapter<T : Enum<T>>(
private val getCode: (T) -> Int,
private val enumValues: EnumEntries<T>
) : JsonSerializer<T>, JsonDeserializer<T> {

override fun serialize(src: T, typeOfSrc: Type, context: JsonSerializationContext): JsonElement {
return JsonPrimitive(getCode(src))
}

override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext): T {
val code = json.asInt
return enumValues.firstOrNull { getCode(it) == code } ?: enumValues.firstOrNull { getCode(it) == -1 }
?: throw JsonParseException("Unknown enum code: $code and no Unknown fallback found")
}
}

internal class WireObjectDataJsonSerializer : JsonSerializer<WireObjectData>, JsonDeserializer<WireObjectData> {
override fun serialize(src: WireObjectData, typeOfSrc: Type?, context: JsonSerializationContext?): JsonElement {
val obj = JsonObject()
src.objectId?.let { obj.addProperty("objectId", it) }
src.string?.let { obj.addProperty("string", it) }
src.number?.let { obj.addProperty("number", it) }
src.boolean?.let { obj.addProperty("boolean", it) }
src.bytes?.let { obj.addProperty("bytes", it) }
src.json?.let { obj.addProperty("json", it.toString()) } // Spec: OD4c5
return obj
}

override fun deserialize(json: JsonElement, typeOfT: Type?, context: JsonDeserializationContext?): WireObjectData {
val obj = if (json.isJsonObject) json.asJsonObject else throw JsonParseException("Expected JsonObject")
val objectId = if (obj.has("objectId")) obj.get("objectId").asString else null
val string = if (obj.has("string")) obj.get("string").asString else null
val number = if (obj.has("number")) obj.get("number").asDouble else null
val boolean = if (obj.has("boolean")) obj.get("boolean").asBoolean else null
val bytes = if (obj.has("bytes")) obj.get("bytes").asString else null
val json = if (obj.has("json")) JsonParser.parseString(obj.get("json").asString) else null

if (objectId == null && string == null && number == null && boolean == null && bytes == null && json == null) {
throw JsonParseException("Since objectId is not present, at least one of the value fields must be present")
}
return WireObjectData(objectId = objectId, string = string, number = number, boolean = boolean, bytes = bytes, json = json)
}
}
Loading
Loading