Writing the Python Script

The script you can uses with the Python module converts an incoming JSON file that is created from your graph, to an outgoing JSON file that is delivered to the user.

This 'transformation' can be anything - from a simple calculation to a complete Machine Learning analysis.

How It Works

Your script is called by ARDI, and is sent the data from your graph through standard input, as a JSON file.

As an example, if you had both a Temperature and a Pressure Python Input node, you'd be given a file like the one below…

{
   "batches": {
        0: {
             "sharpstart": 0,
             "sharpend": 0,
             "data": [
                   {
                        "time": 20993044,
                        "temperature": 22.5,
                        "pressure": 2912
                   }
               .....
           }
      }
}

The data is broken up into batches, which then have a list of samples, with a timestamp (in UTC Epoch Seconds) and values for each of your inputs.

You can process this data any way you'd like.

Your output should also be a JSON file, which will be merged with the existing analytic query data and returned to the user.

Inputs

You can specify what inputs your Python script needs using our special comments.

You can also optionally choose the caching method (covered later).

Uncached Example Code

import sys
import json

#These are the input and cache channels.
#+Customer
#+Speed
#+Cache=stops|batch:int

#Read in the standard input
content = sys.stdin.read()

#Convert the input to JSON
content = json.loads(content)

items = []

for batch in content['batches']:    
    customer = None
    maxspeed = None
    
   #Scan every second of data in the batch
    for n in batch['data']:
        if customer is None:
             #This is the first time we've run this.
             customer = n['customer']
             maxspeed = n['speed']
        else:
             if n['customer'] == customer:
                 if n['speed'] > maxspeed:
                     maxspeed = n['speed']
             else:
                 thing = {}
                 thing['customer'] = customer
                 thing['maxspeed'] = maxspeed
                 items.append(thing)
                 customer = n['customer']
                 maxspeed = n['speed']

    thing = {}
    thing['customer'] = customer
    thing['maxspeed'] = maxspeed
    items.append(thing)

#Assign the stoppages to our array
final['customer_orders'] = items
    
#Write the JSON formatted final data
print(json.dumps(final))

This code goes through every batch, and through every point of data in that batch.

It then returns the maximum speed value it could find for each customer.

Caching

If you're not using caching, all of your data will come in as a single 'batch'.

If you would like to take advantage of caching because you're creating events, your data might be broken up into two or more batches - these are the un-cached regions of time that need to be filled in.

It is important that you…

  • Prevent any data from carrying over from the previous batch to the next batch. You usually do this by clearing any temporary values that persist between loops in your analytic.
  • Add a “Complete” property to each of your events, and only set it to '1' if you're sure that you have captured the entire event rather than just the start or end.
  • Pay attention to the sharpstart and sharpend properties of the batch. A 'sharp' start or end indicates that it runs straight into an already-cached time-period. If your event is already under-way at the start of a batch where sharpstart is 1 and it continues all the way through to end end where sharpend is also 1, the event should be marked as complete.

The code below records each 'stop' event (where the speed is < 0.1m/s), and records the active batch number at that moment.

Caching Example Code

import sys
import json

#These are the input and cache channels.
#+Batch
#+Speed
#+Cache=stops|batch:int

#Read in the standard input
content = sys.stdin.read()

#Convert the input to JSON
content = json.loads(content)

stoppages = []
final = {}

#Process each batch.
for batch in content['batches']:    
    stopstart = None
    first = True
    
   #Scan every second of data in the batch
    for n in batch['data']:
        if abs(n['speed']) < 0.1:           
            #We've stopped and have to do something.
            stopstart = {}
            stopstart['start'] = n['time']
            stopstart['complete'] = 0
            stopstart['batch'] = n['batch']

            #The first event might be very incomplete.
            if first == True:
                if batch['sharpstart'] == 0:
                    stopstart['partial'] = 1;
        else:

            if stopstart is not None:
                #Write out the end of the eevnt.
                stopstart['end'] = n['time']
                stopstart['complete'] = 1
                stoppages.append(stopstart)
                stopstart = None

        first = False

    #If we finished with an incomplete event, make sure it's marked incomplete.
    if stopstart is not None:
        stopstart['end'] = batch['data'][len(batch['data'])-1]['time']
        stopstart['complete'] = 0
        
    if batch['sharpend'] == 1:
        if 'partial' not in stopstart:
            stopstart['complete'] = 1

#Assign the stoppages to our array
final['stops'] = stoppages
    
#Write the JSON formatted final data
print(json.dumps(final))