As stated in the overview of the project, our goal is to use user-built Unity scenes as training environments for RL algorithms written in Python. This means that the bulk of our framework will concern communicating between Python and Unity.
To be a fully-functioning training environment, the framework must:
Allow Python to specify a scene to open for training - referred to as an experiment.
Allow Python initialize training sessions in the scene - referred to as an experiment session - controlling all variables (i.e. random seeds).
Send information from Unity about the state of each experiment session between Update() calls.
Allow Python to send actions back to Unity mid-training.
Notify Python when all experiment sessions have terminated, and report the final scores of the sessions.
To bring agents out of Python into a standalone Unity project we use ONNX and the Unity Barracuda plugin, since all the agents we train are neural networks. Unity Sentis could be used as an alternative.
To communicate between Unity and Python, we use a duplex, asynchronous, named, Windows pipe. This allows both Python and Unity to send and receive messages.
The protocol used can be broken down into the four steps:
One of the challenges with using a named pipe in Unity is the limited support for multithreading. Unity is not thread-safe, and by default forces all threads to execute on the main thread via its custom SynchronizationContext. This poses a problem for the "reading actions" step of the communication protocol. Since there is no set order in which experiment sessions could receive their actions, we want multiple experiment sessions to wait for actions at the same time. However, if we have all experiment sessions block until they receive an action on the main thread, we often run into a problematic situation where one session has to deal with another session's command.
If we have experiment sessions A and B waiting for their commands (in that order on the main thread) and the commands arrive in the order B then A, experiment session A will read command B before command A. If session A discards command B, session B will never receive an action and we will be permanently blocked. We need an intermediary who can multiplex the commands to their respective sessions.
Additionally, it would be convenient if we could parallelize reading from the pipe by putting it on a thread other than the main thread. For multithreading, Unity does have its jobs system; but, the job system can only use blittable types, so it can't handle NamedPipeClients. To achieve multithreading, the current framework resets the SynchronizationContext back to the C# default.
Fortunately, managing experiment sessions themselves is much simpler. We just need layers to abstract away different parts of turning messages sent from a pipe to actions in Unity.
The Python side of the project is much more straightforward. Because we're not required to adhere to any threading constraints, we're afforded much more flexibility in how we interface with Unity than vice versa. This project encapsulates communication with Unity into a single class. This allows the user to design their own RL algorithms independently of Unity while still re-using code for communication.
For example, here is a side-by-side of pseudocode for a genetic and gradient descent algorithm that uses our UnityInstance object:
pipe_path_and_name = os.path.join(r'\\.\pipe\', <PIPE_NAME>)
executable_args = {
simulator_path: <PATH_TO_UNITY_TRAINING_BUILD>,
simulator_args: ['-batchmode', '-nographics', '-p',
<PIPE_NAME>]
}
u_instance: UnityInstance = UnityInstance(pipe_path_and_name, executable_args)
def mutate_population (population, scores) -> list[Any]:
...
def individual_to_json(individual) -> str:
...
def take_action(individual, state) -> str:
...
population: list[Any] = <INITIAL_POPULATION>
scores: list[float] = [None] * len(population)
epochs: int = <NUMBER_OF_EPOCHS>
for i in range(epochs):
u_instance.run_experiment(<EXPERIMENT_NAME>)
serialized_pop = map(individual_to_json, population)
u_instance.send_session_initialization_data(serialized_pop)
u_instance.end_send_initialization_data()
while True:
line = u_instance.read_line()
if line is None:
break
line_split = line.split(' ')
if len(line_split) > 1:
indiv_id = int(line_split[0])
score_parsed = True
try:
score = float(line_split[1])
except:
score_parsed = False
if score_parsed:
scores[indiv_id] = score
else:
action = take_action(population[indiv_id], line)
u_instance.write_line(action)
u_instance.flush()
population = mutate_population(population, scores)
u_instance.quit()
u_instance.close_pipe()
pipe_path_and_name = os.path.join(r'\\.\pipe\', <PIPE_NAME>)
executable_args = {
simulator_path: <PATH_TO_UNITY_TRAINING_BUILD>,
simulator_args: ['-batchmode', '-nographics', '-p',
<PIPE_NAME>]
}
u_instance: UnityInstance = UnityInstance(pipe_path_and_name, executable_args)
def gradient_descent(agent, final_scores):
...
def sim_args_to_json(sim_args) -> str:
...
def take_action(agent, state) -> str:
...
def create_sim_args() -> list:
...
agent = <INITIAL_AGENT>
sim_args: list[Any] = create_sim_args()
final_scores: list[float] = [None] * len(sim_args)
epochs: int = <NUMBER_OF_EPOCHS>
for i in range(epochs):
u_instance.run_experiment(<EXPERIMENT_NAME>)
serialized_args = map(sim_args_to_json, sim_args)
u_instance.send_session_initialization_data(serialized_args)
u_instance.end_send_initialization_data()
while True:
line = u_instance.read_line()
if line is None:
break
line_split = line.split(' ')
if len(line_split) > 1:
indiv_id = int(line_split[0])
score_parsed = True
try:
score = float(line_split[1])
except:
score_parsed = False
if score_parsed:
final_scores[indiv_id] = score
else:
action = take_action(agent, line)
u_instance.write_line(action)
u_instance.flush()
gradient_descent(agent, final_scores)
sim_args = create_sim_args()
u_instance.quit()
u_instance.close_pipe()
"""
Note: Doing gradient descent only on the final score is impractical, and you need to keep track of the actions the agent took. In practice, you would gather information when the agent takes an action, including the running score, and perform gradient descent on that.
"""
The architecture described above manages to achieve the goals we set out at the top of this page, but it doesn't come without drawbacks.
Some of the classes used in Unity for managing experiment sessions use C# generics so that they can function on arbitrary experiments. This comes at the cost of needing to write classes that inherit from all of these generic classes for each experiment, which is, admittedly, tedious.
Every time we introduce a new command, we need to rewrite some boilerplate to parse it from a JSON object - i.e. writing a struct that contains all the JSON object values, and then transforming it into a class that has helpful methods. This is closely tied to the C# Generics problem.
Because all experiment sessions currently run in the same physics world, before a physics step can be taken, all experiment sessions must have received an action command. Thus, session A must wait for session B to receive an action command before moving to the next step, even though session A has already received its command, and has all the information it needs to continue.
There are options in Unity to have game objects in their own physics worlds. Yet, all physics worlds may be bound to stepping all together at the same time. If this is the case, the problem persists.
There is always the option to run multiple instances of a Unity executable in parallel.
Currently, Unity assigns each experiment session one of 16 physics layers to simulate each experiment session independently. This introduces two problems:
Experiment sessions cannot have multiple layers.
There is a relatively low limit to the number of experiment sessions that can run in parallel.
There are two ways to fix this problem:
Physically separate out experiment sessions in the Unity scene. This requires that experiment sessions have bounds on how far objects in the session can travel (to prevent an object from one session interacting with an object from another session). Additionally, experiments far from the scene's origin may be subject to problems regarding low floating-point granularity for Vector3 positions.
Give each experiment its own physics world.
Most games will use the Update or Fixed Update physics simulation modes, as, barring unique mechanics, developers won't need control over the update rate of physics themselves. However, our framework wants to have control over the update rate, so it uses features that are only available in the Script simulation mode. Thus, the simulation mode must be toggled when training versus creating a human-playable build of the game.
This design also makes the framework incompatible with games that use the Script simulation mode, as the current version assumes regular physics updates.
When receiving a line from the pipe, Unity passes the line to a stack of parsers to see what object the line was intended for. For example, consider a line intended for the dispatcher object. The line first passes to the topmost parser in the parser stack, which checks for errors and warnings. Since our line is for the dispatcher object, it should be unparseable by the errors and warnings parser; so it will move on to the next parser which is the dispatcher parser. Since the dispatcher parser can parse the line, no other parsers in the stack will try to parse the line.
This system is good for dynamically multiplexing commands, but runs into issues regarding the timing of navigating the parser stack. The problem is that some commands change the state of the parser stack. For example, starting an experiment adds parsers to listen to the action commands of individual experiment sessions. Say we have a line to start an experiment immediately followed by a line giving an action to an experiment session. Before we can parse the second line, the parser stack must be updated to include parsers that can parse actions for experiment sessions. But how does the parser stack know that it's ready to parse the next line?
Currently, the framework makes use of an AcceptNext() method that must be called by a user once we're ready to parse a new line. But, this is clunky and error-prone - forgetting this line can lead to indefinite blocking, as no future lines will be parsed.
There may be solutions in pre-parsing lines, checking for updates to the parser stack, and throwing out pre-processed lines if an update was made (similar to how branch prediction works). Or defining a scope where changes to the parser must be made before it is considered to be acceptable to parse a new line - in other words, creating a virtual method ParserStackChanges() that is called before AcceptNext(), ensuring that AcceptNext() is always called.
Resetting the SynchronizationContext to the C# default means that this framework is not compatible with games that use multithreading and rely on Unity's SynchronizationContext.