-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathInvokeScripted_hello_world(update_attribute).py
More file actions
61 lines (47 loc) · 1.96 KB
/
InvokeScripted_hello_world(update_attribute).py
File metadata and controls
61 lines (47 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import sys
import traceback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
class UpdateAttributes(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
descriptor = PropertyDescriptor.Builder().name("for-attributes").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
return [descriptor]
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
# extract some attribute values
fromPropertyValue = context.getProperty("for-attributes").getValue()
fromAttributeValue = flowfile.getAttribute("for-attributes")
# set an attribute
flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue)
flowfile = session.putAttribute(flowfile, "from-attribute", fromAttributeValue)
flowfile = session.putAttribute(flowfile, "message", 'Hello-World')
# transfer
session.transfer(flowfile, self.__rel_success)
session.commit()
except :
print sys.exc_info()[0]
print "Exception in TestReader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
session.rollback(true)
raise
processor = UpdateAttributes()