Writing the Python Script
The script you can uses with the Python module converts an incoming JSON file that is created from your graph, to an outgoing JSON file that is delivered to the user.
This 'transformation' can be anything - from a simple calculation to a complete Machine Learning analysis.
How It Works
Your script is called by ARDI, and is sent the data from your graph through standard input, as a JSON file.
As an example, if you had both a Temperature and a Pressure Python Input node, you'd be given a file like the one below…
{ "batches": { 0: { "sharpstart": 0, "sharpend": 0, "data": [ { "time": 20993044, "temperature": 22.5, "pressure": 2912 } ..... } } }
The data is broken up into batches, which then have a list of samples, with a timestamp (in UTC Epoch Seconds) and values for each of your inputs.
You can process this data any way you'd like.
Your output should also be a JSON file, which will be merged with the existing analytic query data and returned to the user.
Inputs
You can specify what inputs your Python script needs using our special comments.
You can also optionally choose the caching method (covered later).
Uncached Example Code
import sys import json #These are the input and cache channels. #+Customer #+Speed #+Cache=stops|batch:int #Read in the standard input content = sys.stdin.read() #Convert the input to JSON content = json.loads(content) items = [] for batch in content['batches']: customer = None maxspeed = None #Scan every second of data in the batch for n in batch['data']: if customer is None: #This is the first time we've run this. customer = n['customer'] maxspeed = n['speed'] else: if n['customer'] == customer: if n['speed'] > maxspeed: maxspeed = n['speed'] else: thing = {} thing['customer'] = customer thing['maxspeed'] = maxspeed items.append(thing) customer = n['customer'] maxspeed = n['speed'] thing = {} thing['customer'] = customer thing['maxspeed'] = maxspeed items.append(thing) #Assign the stoppages to our array final['customer_orders'] = items #Write the JSON formatted final data print(json.dumps(final))
This code goes through every batch, and through every point of data in that batch.
It then returns the maximum speed value it could find for each customer.
Caching
If you're not using caching, all of your data will come in as a single 'batch'.
If you would like to take advantage of caching because you're creating events, your data might be broken up into two or more batches - these are the un-cached regions of time that need to be filled in.
It is important that you…
- Prevent any data from carrying over from the previous batch to the next batch. You usually do this by clearing any temporary values that persist between loops in your analytic.
- Add a “Complete” property to each of your events, and only set it to '1' if you're sure that you have captured the entire event rather than just the start or end.
- Pay attention to the sharpstart and sharpend properties of the batch. A 'sharp' start or end indicates that it runs straight into an already-cached time-period. If your event is already under-way at the start of a batch where sharpstart is 1 and it continues all the way through to end end where sharpend is also 1, the event should be marked as complete.
The code below records each 'stop' event (where the speed is < 0.1m/s), and records the active batch number at that moment.
Caching Example Code
import sys import json #These are the input and cache channels. #+Batch #+Speed #+Cache=stops|batch:int #Read in the standard input content = sys.stdin.read() #Convert the input to JSON content = json.loads(content) stoppages = [] final = {} #Process each batch. for batch in content['batches']: stopstart = None first = True #Scan every second of data in the batch for n in batch['data']: if abs(n['speed']) < 0.1: #We've stopped and have to do something. stopstart = {} stopstart['start'] = n['time'] stopstart['complete'] = 0 stopstart['batch'] = n['batch'] #The first event might be very incomplete. if first == True: if batch['sharpstart'] == 0: stopstart['partial'] = 1; else: if stopstart is not None: #Write out the end of the eevnt. stopstart['end'] = n['time'] stopstart['complete'] = 1 stoppages.append(stopstart) stopstart = None first = False #If we finished with an incomplete event, make sure it's marked incomplete. if stopstart is not None: stopstart['end'] = batch['data'][len(batch['data'])-1]['time'] stopstart['complete'] = 0 if batch['sharpend'] == 1: if 'partial' not in stopstart: stopstart['complete'] = 1 #Assign the stoppages to our array final['stops'] = stoppages #Write the JSON formatted final data print(json.dumps(final))